Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5473,8 +5473,18 @@ private void persistMaterializedViewDefinitionMetadataBestEffort(TableConfig tab
try {
Map<String, String> taskConfigs = tableConfig.getMaterializedViewTaskConfigs();
if (taskConfigs == null) {
LOGGER.warn("MV table {} has no MaterializedViewTask config; skipping definition metadata persist",
tableNameWithType);
// This MV is not materialized by the built-in MaterializedViewTask. A downstream
// MaterializedViewDdlHandler / task type may materialize it via its own minion runtime; by
// contract (see MaterializedViewDdlHandler) that runtime also owns its definition metadata
// and consistency tracking, so the built-in machinery here intentionally does not manage it.
if (tableConfig.hasMaterializedViewTaskWithDefinedSql()) {
LOGGER.info("MV table {} uses a non-built-in MV task type; its definition metadata is owned "
+ "by that task type's runtime — skipping built-in MaterializedViewTask metadata persist",
tableNameWithType);
} else {
LOGGER.warn("MV table {} has no MaterializedViewTask config; skipping definition metadata persist",
tableNameWithType);
}
return;
}
String definedSql = taskConfigs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
Expand Down Expand Up @@ -5705,8 +5715,16 @@ private void notifyMaterializedViewConsistencyManagerForTableCreate(TableConfig
}
Map<String, String> materializedViewTaskConfigs = tableConfig.getMaterializedViewTaskConfigs();
if (materializedViewTaskConfigs == null) {
LOGGER.warn("MV table {} has no MaterializedViewTask config for consistency registration",
tableConfig.getTableName());
// Not a built-in MaterializedViewTask MV: a downstream task type materializes it and, by
// contract (see MaterializedViewDdlHandler), owns its own consistency tracking. The built-in
// MaterializedViewConsistencyManager intentionally does not register it here.
if (tableConfig.hasMaterializedViewTaskWithDefinedSql()) {
LOGGER.info("MV table {} uses a non-built-in MV task type; consistency is owned by that task "
+ "type's runtime — skipping built-in consistency registration", tableConfig.getTableName());
} else {
LOGGER.warn("MV table {} has no MaterializedViewTask config for consistency registration",
tableConfig.getTableName());
}
return;
}
String definedSQL = materializedViewTaskConfigs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.calcite.sql.SqlBasicTypeNameSpec;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
Expand Down Expand Up @@ -88,9 +87,29 @@
/// Stateless and thread-safe. All entry points are static.
public final class DdlCompiler {

/// Pluggable handler for `CREATE MATERIALIZED VIEW` compilation (query validation + task-config
/// routing). Defaults to single-source / single-stage behavior. A distribution that supports
/// richer MV definitions (e.g. a multi-stage-engine MV whose `AS` clause is a JOIN) installs its
/// own handler via [#setMaterializedViewDdlHandler] once at controller startup, before any DDL is
/// served. Volatile because it is read on the DDL request path and may be set from a different
/// (startup) thread; not intended to be swapped while DDL is in flight.
private static volatile MaterializedViewDdlHandler _materializedViewDdlHandler =
new DefaultMaterializedViewDdlHandler();

private DdlCompiler() {
}

/// Installs the [MaterializedViewDdlHandler] used for all subsequent `CREATE MATERIALIZED VIEW`
/// compilations. Call once at controller startup; defaults to [DefaultMaterializedViewDdlHandler].
public static void setMaterializedViewDdlHandler(MaterializedViewDdlHandler handler) {
_materializedViewDdlHandler = handler;
}

/// Returns the active materialized-view DDL handler (never null; defaults to single-source SSE).
public static MaterializedViewDdlHandler getMaterializedViewDdlHandler() {
return _materializedViewDdlHandler;
}

/// Parses and compiles a DDL statement using a stateless {@link DdlCompileContext}.
///
/// @deprecated Use {@link #compile(String, DdlCompileContext)} and supply a real context.
Expand Down Expand Up @@ -526,11 +545,14 @@ private static CompiledCreateMaterializedView compileCreateMaterializedView(Stri
List<String> warnings = new ArrayList<>();
Map<String, String> properties = resolveProperties(node.getProperties().getList());

// Reject JOIN early so the inferer's single-source assumption (and the analyzer's
// downstream check) cannot be violated by a definedSql we haven't validated yet.
rejectJoinInDefinedSql(node.getQuery());
// The engine (SSE vs MSE) is the registered handler's choice — not the compiler's. Extract the
// verbatim AS-clause text, then let the handler validate it for its target engine (the default
// handler re-compiles it as a single-stage Pinot query; an MSE handler does a multi-stage check).
// Done before column resolution / schema inference so the single-source inferer cannot be fed a
// definition the handler hasn't accepted.
MaterializedViewDdlHandler mvHandler = getMaterializedViewDdlHandler();
String definedSql = extractDefinedSql(originalSql, node.getQuery());
verifyDefinedSqlIsParseable(definedSql);
mvHandler.validateDefinedQuery(node.getQuery(), definedSql, properties);

// Two paths:
// 1) Explicit column list — legacy path, will be deprecated once the inferer matures.
Expand All @@ -539,6 +561,15 @@ private static CompiledCreateMaterializedView compileCreateMaterializedView(Stri
// surfaces a clear message when it's absent.
List<ResolvedColumnDefinition> columns;
if (node.getColumns().getList().isEmpty()) {
// Schema inference from the AS <query> projection is single-source-only. A handler whose
// definedSQL may be multi-source (e.g. a multi-stage-engine MV) reports that it does not
// support inference, so an explicit column list is required.
if (!mvHandler.supportsSchemaInference(properties)) {
throw new DdlCompilationException(
"CREATE MATERIALIZED VIEW requires an explicit column list for this materialized view; "
+ "schema inference from the AS <query> projection is only supported for "
+ "single-source materialized views.");
}
// Fall back to the request-header database when the DDL itself does not qualify the
// MV name — Calcite needs SOME database to resolve `FROM src` against, and the
// header's intent ("operate on database X") matches where the MV will be created.
Expand Down Expand Up @@ -598,10 +629,17 @@ private static CompiledCreateMaterializedView compileCreateMaterializedView(Stri
TableConfigBuilder builder = new TableConfigBuilder(TableType.OFFLINE)
.setTableName(tableNameForConfig)
.setIsMaterializedView(true);
MaterializedViewPropertyRouter.apply(properties, definedSql, schedule, builder);
// The handler routes the MV properties onto the builder and returns the minion task type it
// stamped (default: MaterializedViewTask). The consistency check below uses that task type.
String mvTaskType = mvHandler.applyTaskConfig(properties, definedSql, schedule, builder);
Comment thread
xiangfu0 marked this conversation as resolved.
if (mvTaskType == null) {
throw new DdlCompilationException("MaterializedViewDdlHandler "
+ mvHandler.getClass().getName() + " returned a null task type from applyTaskConfig; it "
+ "must return the task type it stamped onto the table config.");
}
TableConfig tableConfig = builder.build();

validateMaterializedViewConsistency(schema, tableConfig);
validateMaterializedViewConsistency(schema, tableConfig, mvTaskType);

return new CompiledCreateMaterializedView(resolved.getDatabaseName(), schema, tableConfig,
node.isIfNotExists(), warnings);
Expand Down Expand Up @@ -734,64 +772,10 @@ private static int lineColToOffset(String sql, int line, int col) {
+ sql.length() + ".");
}

/// Walks the parsed AS-clause AST and throws a clear, MV-specific error if any JOIN node
/// is found at any depth (top-level FROM, subquery FROM, lateral join, etc.). See the
/// call-site comment in [#compileCreateMaterializedView] for *why* we do this against the
/// AST rather than letting the downstream re-parse / analyzer surface the limitation.
private static void rejectJoinInDefinedSql(SqlNode queryNode) {
if (containsJoin(queryNode)) {
throw new DdlCompilationException(
"CREATE MATERIALIZED VIEW does not support JOIN in the AS clause. "
+ "Materialized views currently read from a single source table; "
+ "pre-join the inputs into a base table and reference that single table "
+ "in the MV definition.");
}
}

/// Returns true iff `node` or any descendant is a [SqlKind#JOIN] call. We walk via the
/// `SqlCall#getOperandList` / `SqlNodeList` axes so the traversal stays wrapper-agnostic
/// (SqlOrderBy, SqlWith, SqlExplain) — mirroring how [#collectPositions] walks the tree.
/// Leaves (SqlLiteral, SqlIdentifier, ...) terminate the recursion naturally.
private static boolean containsJoin(@Nullable SqlNode node) {
if (node == null) {
return false;
}
if (node.getKind() == SqlKind.JOIN) {
return true;
}
if (node instanceof SqlCall) {
for (SqlNode child : ((SqlCall) node).getOperandList()) {
if (containsJoin(child)) {
return true;
}
}
} else if (node instanceof SqlNodeList) {
for (SqlNode child : (SqlNodeList) node) {
if (containsJoin(child)) {
return true;
}
}
}
return false;
}

/// Sanity check: the substring we extracted must be a standalone parseable Pinot query.
/// This guards against DDL-layer slicing bugs (off-by-one in [#lineColToOffset], parser
/// position quirks) so the error surfaces in the DDL layer rather than at first scheduler
/// tick or at create-time analysis (PR 3).
private static void verifyDefinedSqlIsParseable(String definedSql) {
try {
CalciteSqlParser.compileToPinotQuery(definedSql);
} catch (Exception e) {
throw new DdlCompilationException(
"AS <query> did not re-parse as a Pinot query: " + e.getMessage()
+ " (extracted text: " + definedSql + ")", e);
}
}

/// Cross-checks: `timeColumnName` must reference a declared DATETIME column, and
/// `bucketTimePeriod` must be present so the scheduler has a window size.
private static void validateMaterializedViewConsistency(Schema schema, TableConfig tableConfig) {
private static void validateMaterializedViewConsistency(Schema schema, TableConfig tableConfig,
String mvTaskType) {
String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
if (timeColumnName == null || timeColumnName.isEmpty()) {
throw new DdlCompilationException(
Expand All @@ -809,9 +793,18 @@ private static void validateMaterializedViewConsistency(Schema schema, TableConf
}
Map<String, String> mvTaskConfig = tableConfig.getTaskConfig() == null
? null
: tableConfig.getTaskConfig().getConfigsForTaskType(MaterializedViewTask.TASK_TYPE);
if (mvTaskConfig == null
|| !mvTaskConfig.containsKey(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY)) {
: tableConfig.getTaskConfig().getConfigsForTaskType(mvTaskType);
if (mvTaskConfig == null) {
// The handler's applyTaskConfig returned this task type but did not stamp a matching task
// config onto the builder — a handler-contract violation, not a user error. Surface it as
// such so a custom MaterializedViewDdlHandler author gets an actionable diagnostic rather
// than the misleading "bucketTimePeriod missing" message below.
throw new DdlCompilationException(
"MaterializedViewDdlHandler returned task type '" + mvTaskType + "' from applyTaskConfig "
+ "but did not stamp a task config under it; the returned task type must match the "
+ "task config written to the table.");
}
if (!mvTaskConfig.containsKey(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY)) {
throw new DdlCompilationException(
"CREATE MATERIALIZED VIEW requires a 'bucketTimePeriod' property (e.g. '1d', '1h'); "
+ "it defines the time window each refresh tick materializes.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.sql.ddl.compile;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.calcite.sql.SqlNode;
import org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlParser;


/// Default, single-stage-engine (SSE) materialized-view DDL handler.
///
/// A materialized view reads from exactly one source table: a JOIN in the `AS <query>` clause is
/// rejected, and the MV task configuration is routed under the built-in `MaterializedViewTask` task
/// type. This is the behavior of OSS Pinot when no alternative handler is registered.
public class DefaultMaterializedViewDdlHandler implements MaterializedViewDdlHandler {

@Override
public void validateDefinedQuery(SqlNode queryNode, String definedSql, Map<String, String> properties) {
if (MaterializedViewDdlHandler.containsJoin(queryNode)) {
throw new DdlCompilationException(
"CREATE MATERIALIZED VIEW does not support JOIN in the AS clause. "
+ "Materialized views currently read from a single source table; "
+ "pre-join the inputs into a base table and reference that single table "
+ "in the MV definition.");
}
/// Re-compile the extracted definedSQL as a single-stage Pinot query. This both guards against
/// DDL-layer slicing bugs (off-by-one in the parser-position extraction) and enforces SSE
/// compatibility, so the error surfaces here rather than at the first scheduler tick / analysis.
try {
CalciteSqlParser.compileToPinotQuery(definedSql);
} catch (Exception e) {
throw new DdlCompilationException(
"AS <query> did not re-parse as a single-stage Pinot query: " + e.getMessage()
+ " (extracted text: " + definedSql + ")", e);
}
}

@Override
public String applyTaskConfig(Map<String, String> properties, String definedSql,
@Nullable String schedule, TableConfigBuilder builder) {
MaterializedViewPropertyRouter.apply(properties, definedSql, schedule, builder);
return MaterializedViewTask.TASK_TYPE;
}
}
Loading
Loading