diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index f847aa96ad45..ed1439afbed9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -5473,8 +5473,18 @@ private void persistMaterializedViewDefinitionMetadataBestEffort(TableConfig tab try { Map taskConfigs = tableConfig.getMaterializedViewTaskConfigs(); if (taskConfigs == null) { - LOGGER.warn("MV table {} has no MaterializedViewTask config; skipping definition metadata persist", - tableNameWithType); + // This MV is not materialized by the built-in MaterializedViewTask. A downstream + // MaterializedViewDdlHandler / task type may materialize it via its own minion runtime; by + // contract (see MaterializedViewDdlHandler) that runtime also owns its definition metadata + // and consistency tracking, so the built-in machinery here intentionally does not manage it. + if (tableConfig.hasMaterializedViewTaskWithDefinedSql()) { + LOGGER.info("MV table {} uses a non-built-in MV task type; its definition metadata is owned " + + "by that task type's runtime — skipping built-in MaterializedViewTask metadata persist", + tableNameWithType); + } else { + LOGGER.warn("MV table {} has no MaterializedViewTask config; skipping definition metadata persist", + tableNameWithType); + } return; } String definedSql = taskConfigs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY); @@ -5705,8 +5715,16 @@ private void notifyMaterializedViewConsistencyManagerForTableCreate(TableConfig } Map materializedViewTaskConfigs = tableConfig.getMaterializedViewTaskConfigs(); if (materializedViewTaskConfigs == null) { - LOGGER.warn("MV table {} has no MaterializedViewTask config for consistency registration", - tableConfig.getTableName()); + // Not a built-in MaterializedViewTask MV: a downstream task type materializes it and, by + // contract (see MaterializedViewDdlHandler), owns its own consistency tracking. The built-in + // MaterializedViewConsistencyManager intentionally does not register it here. + if (tableConfig.hasMaterializedViewTaskWithDefinedSql()) { + LOGGER.info("MV table {} uses a non-built-in MV task type; consistency is owned by that task " + + "type's runtime — skipping built-in consistency registration", tableConfig.getTableName()); + } else { + LOGGER.warn("MV table {} has no MaterializedViewTask config for consistency registration", + tableConfig.getTableName()); + } return; } String definedSQL = materializedViewTaskConfigs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY); diff --git a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java index 61bd20c45ae1..0a4b791fc5bf 100644 --- a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java +++ b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java @@ -30,7 +30,6 @@ import org.apache.calcite.sql.SqlBasicTypeNameSpec; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; @@ -88,9 +87,29 @@ /// Stateless and thread-safe. All entry points are static. public final class DdlCompiler { + /// Pluggable handler for `CREATE MATERIALIZED VIEW` compilation (query validation + task-config + /// routing). Defaults to single-source / single-stage behavior. A distribution that supports + /// richer MV definitions (e.g. a multi-stage-engine MV whose `AS` clause is a JOIN) installs its + /// own handler via [#setMaterializedViewDdlHandler] once at controller startup, before any DDL is + /// served. Volatile because it is read on the DDL request path and may be set from a different + /// (startup) thread; not intended to be swapped while DDL is in flight. + private static volatile MaterializedViewDdlHandler _materializedViewDdlHandler = + new DefaultMaterializedViewDdlHandler(); + private DdlCompiler() { } + /// Installs the [MaterializedViewDdlHandler] used for all subsequent `CREATE MATERIALIZED VIEW` + /// compilations. Call once at controller startup; defaults to [DefaultMaterializedViewDdlHandler]. + public static void setMaterializedViewDdlHandler(MaterializedViewDdlHandler handler) { + _materializedViewDdlHandler = handler; + } + + /// Returns the active materialized-view DDL handler (never null; defaults to single-source SSE). + public static MaterializedViewDdlHandler getMaterializedViewDdlHandler() { + return _materializedViewDdlHandler; + } + /// Parses and compiles a DDL statement using a stateless {@link DdlCompileContext}. /// /// @deprecated Use {@link #compile(String, DdlCompileContext)} and supply a real context. @@ -526,11 +545,14 @@ private static CompiledCreateMaterializedView compileCreateMaterializedView(Stri List warnings = new ArrayList<>(); Map properties = resolveProperties(node.getProperties().getList()); - // Reject JOIN early so the inferer's single-source assumption (and the analyzer's - // downstream check) cannot be violated by a definedSql we haven't validated yet. - rejectJoinInDefinedSql(node.getQuery()); + // The engine (SSE vs MSE) is the registered handler's choice — not the compiler's. Extract the + // verbatim AS-clause text, then let the handler validate it for its target engine (the default + // handler re-compiles it as a single-stage Pinot query; an MSE handler does a multi-stage check). + // Done before column resolution / schema inference so the single-source inferer cannot be fed a + // definition the handler hasn't accepted. + MaterializedViewDdlHandler mvHandler = getMaterializedViewDdlHandler(); String definedSql = extractDefinedSql(originalSql, node.getQuery()); - verifyDefinedSqlIsParseable(definedSql); + mvHandler.validateDefinedQuery(node.getQuery(), definedSql, properties); // Two paths: // 1) Explicit column list — legacy path, will be deprecated once the inferer matures. @@ -539,6 +561,15 @@ private static CompiledCreateMaterializedView compileCreateMaterializedView(Stri // surfaces a clear message when it's absent. List columns; if (node.getColumns().getList().isEmpty()) { + // Schema inference from the AS projection is single-source-only. A handler whose + // definedSQL may be multi-source (e.g. a multi-stage-engine MV) reports that it does not + // support inference, so an explicit column list is required. + if (!mvHandler.supportsSchemaInference(properties)) { + throw new DdlCompilationException( + "CREATE MATERIALIZED VIEW requires an explicit column list for this materialized view; " + + "schema inference from the AS projection is only supported for " + + "single-source materialized views."); + } // Fall back to the request-header database when the DDL itself does not qualify the // MV name — Calcite needs SOME database to resolve `FROM src` against, and the // header's intent ("operate on database X") matches where the MV will be created. @@ -598,10 +629,17 @@ private static CompiledCreateMaterializedView compileCreateMaterializedView(Stri TableConfigBuilder builder = new TableConfigBuilder(TableType.OFFLINE) .setTableName(tableNameForConfig) .setIsMaterializedView(true); - MaterializedViewPropertyRouter.apply(properties, definedSql, schedule, builder); + // The handler routes the MV properties onto the builder and returns the minion task type it + // stamped (default: MaterializedViewTask). The consistency check below uses that task type. + String mvTaskType = mvHandler.applyTaskConfig(properties, definedSql, schedule, builder); + if (mvTaskType == null) { + throw new DdlCompilationException("MaterializedViewDdlHandler " + + mvHandler.getClass().getName() + " returned a null task type from applyTaskConfig; it " + + "must return the task type it stamped onto the table config."); + } TableConfig tableConfig = builder.build(); - validateMaterializedViewConsistency(schema, tableConfig); + validateMaterializedViewConsistency(schema, tableConfig, mvTaskType); return new CompiledCreateMaterializedView(resolved.getDatabaseName(), schema, tableConfig, node.isIfNotExists(), warnings); @@ -734,64 +772,10 @@ private static int lineColToOffset(String sql, int line, int col) { + sql.length() + "."); } - /// Walks the parsed AS-clause AST and throws a clear, MV-specific error if any JOIN node - /// is found at any depth (top-level FROM, subquery FROM, lateral join, etc.). See the - /// call-site comment in [#compileCreateMaterializedView] for *why* we do this against the - /// AST rather than letting the downstream re-parse / analyzer surface the limitation. - private static void rejectJoinInDefinedSql(SqlNode queryNode) { - if (containsJoin(queryNode)) { - throw new DdlCompilationException( - "CREATE MATERIALIZED VIEW does not support JOIN in the AS clause. " - + "Materialized views currently read from a single source table; " - + "pre-join the inputs into a base table and reference that single table " - + "in the MV definition."); - } - } - - /// Returns true iff `node` or any descendant is a [SqlKind#JOIN] call. We walk via the - /// `SqlCall#getOperandList` / `SqlNodeList` axes so the traversal stays wrapper-agnostic - /// (SqlOrderBy, SqlWith, SqlExplain) — mirroring how [#collectPositions] walks the tree. - /// Leaves (SqlLiteral, SqlIdentifier, ...) terminate the recursion naturally. - private static boolean containsJoin(@Nullable SqlNode node) { - if (node == null) { - return false; - } - if (node.getKind() == SqlKind.JOIN) { - return true; - } - if (node instanceof SqlCall) { - for (SqlNode child : ((SqlCall) node).getOperandList()) { - if (containsJoin(child)) { - return true; - } - } - } else if (node instanceof SqlNodeList) { - for (SqlNode child : (SqlNodeList) node) { - if (containsJoin(child)) { - return true; - } - } - } - return false; - } - - /// Sanity check: the substring we extracted must be a standalone parseable Pinot query. - /// This guards against DDL-layer slicing bugs (off-by-one in [#lineColToOffset], parser - /// position quirks) so the error surfaces in the DDL layer rather than at first scheduler - /// tick or at create-time analysis (PR 3). - private static void verifyDefinedSqlIsParseable(String definedSql) { - try { - CalciteSqlParser.compileToPinotQuery(definedSql); - } catch (Exception e) { - throw new DdlCompilationException( - "AS did not re-parse as a Pinot query: " + e.getMessage() - + " (extracted text: " + definedSql + ")", e); - } - } - /// Cross-checks: `timeColumnName` must reference a declared DATETIME column, and /// `bucketTimePeriod` must be present so the scheduler has a window size. - private static void validateMaterializedViewConsistency(Schema schema, TableConfig tableConfig) { + private static void validateMaterializedViewConsistency(Schema schema, TableConfig tableConfig, + String mvTaskType) { String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); if (timeColumnName == null || timeColumnName.isEmpty()) { throw new DdlCompilationException( @@ -809,9 +793,18 @@ private static void validateMaterializedViewConsistency(Schema schema, TableConf } Map mvTaskConfig = tableConfig.getTaskConfig() == null ? null - : tableConfig.getTaskConfig().getConfigsForTaskType(MaterializedViewTask.TASK_TYPE); - if (mvTaskConfig == null - || !mvTaskConfig.containsKey(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY)) { + : tableConfig.getTaskConfig().getConfigsForTaskType(mvTaskType); + if (mvTaskConfig == null) { + // The handler's applyTaskConfig returned this task type but did not stamp a matching task + // config onto the builder — a handler-contract violation, not a user error. Surface it as + // such so a custom MaterializedViewDdlHandler author gets an actionable diagnostic rather + // than the misleading "bucketTimePeriod missing" message below. + throw new DdlCompilationException( + "MaterializedViewDdlHandler returned task type '" + mvTaskType + "' from applyTaskConfig " + + "but did not stamp a task config under it; the returned task type must match the " + + "task config written to the table."); + } + if (!mvTaskConfig.containsKey(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY)) { throw new DdlCompilationException( "CREATE MATERIALIZED VIEW requires a 'bucketTimePeriod' property (e.g. '1d', '1h'); " + "it defines the time window each refresh tick materializes."); diff --git a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DefaultMaterializedViewDdlHandler.java b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DefaultMaterializedViewDdlHandler.java new file mode 100644 index 000000000000..5d2e3c55b099 --- /dev/null +++ b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DefaultMaterializedViewDdlHandler.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.sql.ddl.compile; + +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.calcite.sql.SqlNode; +import org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.sql.parsers.CalciteSqlParser; + + +/// Default, single-stage-engine (SSE) materialized-view DDL handler. +/// +/// A materialized view reads from exactly one source table: a JOIN in the `AS ` clause is +/// rejected, and the MV task configuration is routed under the built-in `MaterializedViewTask` task +/// type. This is the behavior of OSS Pinot when no alternative handler is registered. +public class DefaultMaterializedViewDdlHandler implements MaterializedViewDdlHandler { + + @Override + public void validateDefinedQuery(SqlNode queryNode, String definedSql, Map properties) { + if (MaterializedViewDdlHandler.containsJoin(queryNode)) { + throw new DdlCompilationException( + "CREATE MATERIALIZED VIEW does not support JOIN in the AS clause. " + + "Materialized views currently read from a single source table; " + + "pre-join the inputs into a base table and reference that single table " + + "in the MV definition."); + } + /// Re-compile the extracted definedSQL as a single-stage Pinot query. This both guards against + /// DDL-layer slicing bugs (off-by-one in the parser-position extraction) and enforces SSE + /// compatibility, so the error surfaces here rather than at the first scheduler tick / analysis. + try { + CalciteSqlParser.compileToPinotQuery(definedSql); + } catch (Exception e) { + throw new DdlCompilationException( + "AS did not re-parse as a single-stage Pinot query: " + e.getMessage() + + " (extracted text: " + definedSql + ")", e); + } + } + + @Override + public String applyTaskConfig(Map properties, String definedSql, + @Nullable String schedule, TableConfigBuilder builder) { + MaterializedViewPropertyRouter.apply(properties, definedSql, schedule, builder); + return MaterializedViewTask.TASK_TYPE; + } +} diff --git a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandler.java b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandler.java new file mode 100644 index 000000000000..303586501eeb --- /dev/null +++ b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandler.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.sql.ddl.compile; + +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; + + +/// Extension point for compiling `CREATE MATERIALIZED VIEW ... AS ` statements. +/// +/// The `definedSQL` of a materialized view can be executed by either query engine; the handler — not +/// the compiler — decides and validates accordingly. The default implementation +/// ([DefaultMaterializedViewDdlHandler]) targets the **single-stage engine (SSE)**: it verifies the +/// `definedSQL` compiles as a single-stage Pinot query (which restricts it to a single source table) +/// and routes the MV task configuration under the built-in `MaterializedViewTask` task type. +/// +/// Downstream distributions can register an alternative handler via +/// [DdlCompiler#setMaterializedViewDdlHandler] to target the **multi-stage engine (MSE)** — accepting +/// multi-source / JOIN definitions and routing them under a different minion task type. Because the +/// controller validates the resulting `TableConfig` (running the task generator's validation) +/// *before* persisting it, an MSE handler MUST stamp a task type whose generator can validate that +/// definition — stamping the built-in `MaterializedViewTask` for a JOIN would be rejected by the +/// single-source [org.apache.pinot.materializedview.analysis.MaterializedViewAnalyzer]. +/// +/// **Contract for non-built-in task types.** A handler that stamps a task type other than the +/// built-in `MaterializedViewTask` is responsible for the *complete* runtime of that task type — +/// not only the minion task generator / executor that materializes the view, but also its +/// definition-metadata persistence and consistency tracking. OSS's built-in controller-side MV +/// machinery (`MaterializedViewDefinitionMetadata` persistence and the +/// `MaterializedViewConsistencyManager`) keys on `MaterializedViewTask` and therefore manages only +/// built-in MVs; it intentionally skips MVs stamped with a different task type (the +/// `isMaterializedView=true` flag and DDL lifecycle still apply uniformly). This keeps the built-in +/// single-source freshness model from being applied to definitions it cannot reason about (e.g. a +/// multi-source JOIN), and leaves freshness/consistency to the task type that owns the MV. +public interface MaterializedViewDdlHandler { + + /// Validates the MV's `AS ` clause for the target engine. Called before column resolution / + /// schema inference. The handler is responsible for whatever engine-specific checks apply — the + /// default (SSE) handler re-compiles `definedSql` as a single-stage Pinot query (rejecting + /// multi-source / JOIN queries); an MSE handler performs a multi-stage-compatible check. Throw + /// [DdlCompilationException] with a user-actionable message when the definition is not supported. + /// + /// @param queryNode the parsed `AS ` SqlNode (use [#containsJoin] for AST-level checks) + /// @param definedSql the extracted verbatim `AS ` text (re-parse it to guard against + /// DDL-layer slicing bugs and to enforce engine compatibility) + /// @param properties the MV's `PROPERTIES (...)` map (raw keys/values as typed by the user) + void validateDefinedQuery(SqlNode queryNode, String definedSql, Map properties); + + /// Whether the MV schema may be inferred from the `AS ` projection when the DDL omits an + /// explicit column list. Defaults to {@code true} (single-source projection inference). A handler + /// whose `definedSQL` may be multi-source — where projection inference is not supported — returns + /// {@code false}, so the compiler requires an explicit column list. + /// + /// @param properties the MV's `PROPERTIES (...)` map + default boolean supportsSchemaInference(Map properties) { + return true; + } + + /// Routes the MV's `PROPERTIES (...)` plus the synthetic `definedSql` / `schedule` onto `builder` + /// and returns the minion task type that was stamped (must be non-null and equal to the task-type + /// key written to `builder`). The returned task type is used to validate MV consistency (e.g. that + /// `bucketTimePeriod` is present under it). + /// + /// @param properties the MV's `PROPERTIES (...)` map + /// @param definedSql the verbatim `AS ` text + /// @param schedule the Quartz cron derived from `REFRESH EVERY`, or {@code null} when absent + /// @param builder the table-config builder to populate (already has name + isMaterializedView) + /// @return the minion task type stamped onto the builder (e.g. {@code MaterializedViewTask}) + String applyTaskConfig(Map properties, String definedSql, @Nullable String schedule, + TableConfigBuilder builder); + + /// Returns true iff `node` or any descendant is a [SqlKind#JOIN] call. Walks the + /// `SqlCall#getOperandList` / `SqlNodeList` axes so the traversal stays wrapper-agnostic + /// (SqlOrderBy, SqlWith, SqlExplain). Leaves (SqlLiteral, SqlIdentifier, ...) terminate naturally. + static boolean containsJoin(@Nullable SqlNode node) { + if (node == null) { + return false; + } + if (node.getKind() == SqlKind.JOIN) { + return true; + } + if (node instanceof SqlCall) { + for (SqlNode child : ((SqlCall) node).getOperandList()) { + if (containsJoin(child)) { + return true; + } + } + } else if (node instanceof SqlNodeList) { + for (SqlNode child : (SqlNodeList) node) { + if (containsJoin(child)) { + return true; + } + } + } + return false; + } +} diff --git a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java index 32f3269ff289..9c2eaf0e560f 100644 --- a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java +++ b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java @@ -139,6 +139,16 @@ private MaterializedViewPropertyRouter() { /// * The caller has already validated `properties` keys do not duplicate (case-insensitive). public static void apply(Map properties, String definedSql, @Nullable String schedule, TableConfigBuilder builder) { + apply(properties, definedSql, schedule, builder, MaterializedViewTask.TASK_TYPE); + } + + /// Same as [#apply(Map, String, String, TableConfigBuilder)] but stores the routed MV task config + /// under the given `taskType` instead of the built-in `MaterializedViewTask`. Used by alternative + /// [MaterializedViewDdlHandler]s that materialize the MV via a different minion task type (e.g. a + /// multi-stage-engine generator). Bare task-property keys (e.g. `bucketTimePeriod`) route the same + /// way regardless of `taskType`; only the task type the config is stored under changes. + public static void apply(Map properties, String definedSql, + @Nullable String schedule, TableConfigBuilder builder, String mvTaskType) { Map> taskConfigs = new LinkedHashMap<>(); Map customConfigs = new LinkedHashMap<>(); Map mvTaskConfig = new LinkedHashMap<>(); @@ -210,7 +220,11 @@ public static void apply(Map properties, String definedSql, } String taskType = afterPrefix.substring(0, dot); String taskKey = afterPrefix.substring(dot + 1); - if (MaterializedViewTask.TASK_TYPE.equals(taskType) + // Compare against the MV's own task type (mvTaskType), not the hard-coded built-in, so a + // custom handler's task type is recognized as "this MV's task config" rather than treated + // as an unrelated composed task type (which would be dropped by the put(mvTaskType, ...) + // below). + if (mvTaskType.equals(taskType) && (SCHEDULE_KEY.equals(taskKey.toLowerCase(Locale.ROOT)) || MaterializedViewTask.DEFINED_SQL_KEY.equalsIgnoreCase(taskKey))) { throw new DdlCompilationException( @@ -218,9 +232,9 @@ public static void apply(Map properties, String definedSql, + "(REFRESH EVERY for 'schedule', AS for 'definedSQL'); " + "remove it from PROPERTIES."); } - if (MaterializedViewTask.TASK_TYPE.equals(taskType)) { + if (mvTaskType.equals(taskType)) { // Canonicalize the knob casing the same way the bare-form branch does, so - // `task.MaterializedViewTask.BUCKETTIMEPERIOD` and bare `BUCKETTIMEPERIOD` end up + // `task..BUCKETTIMEPERIOD` and bare `BUCKETTIMEPERIOD` end up // under the same on-wire key (the constant casing in CommonConstants). String canonical = TASK_CONFIG_KEYS.getOrDefault(taskKey.toLowerCase(Locale.ROOT), taskKey); mvTaskConfig.put(canonical, value); @@ -244,7 +258,7 @@ public static void apply(Map properties, String definedSql, mvTaskConfig.put(SCHEDULE_KEY, schedule); } - taskConfigs.put(MaterializedViewTask.TASK_TYPE, mvTaskConfig); + taskConfigs.put(mvTaskType, mvTaskConfig); builder.setTaskConfig(new TableTaskConfig(taskConfigs)); if (!customConfigs.isEmpty()) { diff --git a/pinot-sql-ddl/src/test/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandlerTest.java b/pinot-sql-ddl/src/test/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandlerTest.java new file mode 100644 index 000000000000..300e08f48c5e --- /dev/null +++ b/pinot-sql-ddl/src/test/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandlerTest.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.sql.ddl.compile; + +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.calcite.sql.SqlNode; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; + + +/// Tests the [MaterializedViewDdlHandler] extension point: a registered handler can accept a JOIN in +/// the `AS` clause and route the MV task config under an alternative task type, while the default +/// handler preserves single-source / single-stage behavior. +public class MaterializedViewDdlHandlerTest { + private static final String JOIN_DDL = + "CREATE MATERIALIZED VIEW mv (" + + " ts TIMESTAMP DATETIME FORMAT '1:MILLISECONDS:TIMESTAMP' GRANULARITY '1:DAYS'," + + " country STRING," + + " amount_sum DOUBLE METRIC" + + ")" + + " REFRESH EVERY 1 DAY" + + " PROPERTIES ('timeColumnName' = 'ts', 'bucketTimePeriod' = '1d')" + + " AS SELECT f.ts, d.country, SUM(f.amount) AS amount_sum" + + " FROM fact f JOIN dim d ON f.id = d.id GROUP BY f.ts, d.country"; + + @AfterMethod + public void restoreDefaultHandler() { + /// The handler is process-wide; restore the default so sibling tests see single-source behavior. + DdlCompiler.setMaterializedViewDdlHandler(new DefaultMaterializedViewDdlHandler()); + } + + @Test + public void defaultsToSingleSourceHandler() { + assertTrue(DdlCompiler.getMaterializedViewDdlHandler() instanceof DefaultMaterializedViewDdlHandler); + } + + @Test + public void defaultHandlerRejectsJoin() { + DdlCompilationException e = + expectThrows(DdlCompilationException.class, () -> DdlCompiler.compile(JOIN_DDL)); + assertTrue(e.getMessage().contains("JOIN"), e.getMessage()); + } + + @Test + public void registeredHandlerAcceptsJoinAndRoutesToCustomTaskType() { + DdlCompiler.setMaterializedViewDdlHandler(new MultiSourceTestHandler()); + + CompiledDdl compiled = DdlCompiler.compile(JOIN_DDL); + assertEquals(compiled.getOperation(), DdlOperation.CREATE_MATERIALIZED_VIEW); + TableConfig tableConfig = ((CompiledCreateMaterializedView) compiled).getTableConfig(); + assertTrue(tableConfig.isMaterializedView()); + + Map> taskTypes = tableConfig.getTaskConfig().getTaskTypeConfigsMap(); + /// Stamped under the handler's task type; the built-in single-stage type is absent. + assertTrue(taskTypes.containsKey(MultiSourceTestHandler.TASK_TYPE)); + assertFalse(taskTypes.containsKey(MaterializedViewTask.TASK_TYPE)); + + Map mvTaskConfig = taskTypes.get(MultiSourceTestHandler.TASK_TYPE); + assertEquals(mvTaskConfig.get(MaterializedViewTask.DEFINED_SQL_KEY), + "SELECT f.ts, d.country, SUM(f.amount) AS amount_sum" + + " FROM fact f JOIN dim d ON f.id = d.id GROUP BY f.ts, d.country"); + /// bucketTimePeriod routed under the custom task type, and the consistency check passed against it. + assertEquals(mvTaskConfig.get(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY), "1d"); + } + + @Test + public void customTaskTypePrefixedKnobsArePreserved() { + DdlCompiler.setMaterializedViewDdlHandler(new MultiSourceTestHandler()); + + /// A `task..` property must survive into the stamped task config (it must + /// not be overwritten by the synthetic task-config map under the same task type). + CompiledDdl compiled = DdlCompiler.compile( + "CREATE MATERIALIZED VIEW mv (" + + " ts TIMESTAMP DATETIME FORMAT '1:MILLISECONDS:TIMESTAMP' GRANULARITY '1:DAYS'," + + " amount_sum DOUBLE METRIC" + + ")" + + " REFRESH EVERY 1 DAY" + + " PROPERTIES ('timeColumnName' = 'ts', 'bucketTimePeriod' = '1d'," + + " 'task." + MultiSourceTestHandler.TASK_TYPE + ".customKnob' = 'v')" + + " AS SELECT f.ts, SUM(f.amount) AS amount_sum" + + " FROM fact f JOIN dim d ON f.id = d.id GROUP BY f.ts"); + Map mvTaskConfig = ((CompiledCreateMaterializedView) compiled).getTableConfig() + .getTaskConfig().getTaskTypeConfigsMap().get(MultiSourceTestHandler.TASK_TYPE); + assertEquals(mvTaskConfig.get("customKnob"), "v"); + /// Synthetic / bare-form keys remain present alongside the prefixed knob. + assertEquals(mvTaskConfig.get(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY), "1d"); + assertTrue(mvTaskConfig.containsKey(MaterializedViewTask.DEFINED_SQL_KEY)); + } + + @Test + public void registeredHandlerJoinWithoutExplicitColumnsRejected() { + DdlCompiler.setMaterializedViewDdlHandler(new MultiSourceTestHandler()); + + /// Even when the handler permits a JOIN, schema inference (no column list) is single-source-only. + DdlCompilationException e = expectThrows(DdlCompilationException.class, () -> DdlCompiler.compile( + "CREATE MATERIALIZED VIEW mv" + + " REFRESH EVERY 1 DAY" + + " PROPERTIES ('timeColumnName' = 'ts', 'bucketTimePeriod' = '1d')" + + " AS SELECT f.ts, d.country FROM fact f JOIN dim d ON f.id = d.id")); + assertTrue(e.getMessage().contains("explicit column list"), e.getMessage()); + } + + @Test + public void handlerReturningNullTaskTypeFailsClearly() { + DdlCompiler.setMaterializedViewDdlHandler(new MaterializedViewDdlHandler() { + @Override + public void validateDefinedQuery(SqlNode queryNode, String definedSql, Map properties) { + } + + @Override + public String applyTaskConfig(Map properties, String definedSql, + @Nullable String schedule, TableConfigBuilder builder) { + return null; + } + }); + DdlCompilationException e = + expectThrows(DdlCompilationException.class, () -> DdlCompiler.compile(JOIN_DDL)); + assertTrue(e.getMessage().contains("null task type"), e.getMessage()); + } + + @Test + public void handlerReturningUnstampedTaskTypeFailsClearly() { + DdlCompiler.setMaterializedViewDdlHandler(new MaterializedViewDdlHandler() { + @Override + public void validateDefinedQuery(SqlNode queryNode, String definedSql, Map properties) { + } + + @Override + public String applyTaskConfig(Map properties, String definedSql, + @Nullable String schedule, TableConfigBuilder builder) { + /// Stamp under one task type but return a different one — a handler-contract violation. + MaterializedViewPropertyRouter.apply(properties, definedSql, schedule, builder, "StampedTask"); + return "ReturnedButNotStampedTask"; + } + }); + DdlCompilationException e = + expectThrows(DdlCompilationException.class, () -> DdlCompiler.compile(JOIN_DDL)); + assertTrue(e.getMessage().contains("did not stamp a task config"), e.getMessage()); + } + + /// Test handler that permits multi-source (JOIN) definitions and routes the MV task config under a + /// distinct task type via the task-type-parameterized [MaterializedViewPropertyRouter#apply]. + private static final class MultiSourceTestHandler implements MaterializedViewDdlHandler { + static final String TASK_TYPE = "CustomMaterializedViewTask"; + + @Override + public void validateDefinedQuery(SqlNode queryNode, String definedSql, Map properties) { + /// Multi-source allowed: no JOIN rejection, no single-stage re-compilation. + } + + @Override + public boolean supportsSchemaInference(Map properties) { + /// Multi-source projection inference is not supported; an explicit column list is required. + return false; + } + + @Override + public String applyTaskConfig(Map properties, String definedSql, + @Nullable String schedule, TableConfigBuilder builder) { + MaterializedViewPropertyRouter.apply(properties, definedSql, schedule, builder, TASK_TYPE); + return TASK_TYPE; + } + } +}