From 0dc3c5aa7ec157e344aab36ca5d5e7ff50d63268 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 14 Feb 2024 13:43:35 -0500 Subject: [PATCH 01/12] ingestHandlers validate sqlNode as DruidSqlIngest instead of the select source node. Implemented validateInsert method in DruidSqlValidtor to validate the respective node and moved a lot of the validation being done previously in the ingestHandlers to this overriden method. In the next commit will try to pull back out some of this validation to make this code change smaller and easier to review. --- .../apache/druid/msq/test/MSQTestBase.java | 6 + .../sql/calcite/parser/DruidSqlInsert.java | 3 +- .../sql/calcite/parser/DruidSqlReplace.java | 5 +- .../sql/calcite/planner/CalcitePlanner.java | 11 +- .../sql/calcite/planner/DruidPlanner.java | 29 +- .../planner/DruidSqlIngestOperator.java | 116 +++ .../planner/DruidSqlToRelConverter.java | 69 ++ .../calcite/planner/DruidSqlValidator.java | 685 +++++++++++++++++- .../sql/calcite/planner/IngestHandler.java | 144 ++-- .../sql/calcite/planner/QueryHandler.java | 35 +- .../sql/calcite/CalciteInsertDmlTest.java | 4 - 11 files changed, 1038 insertions(+), 69 deletions(-) create mode 100644 sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java create mode 100644 sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlToRelConverter.java diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index f8fc01b9369f..8319f0c48b9f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -162,6 +162,9 @@ import org.apache.druid.sql.calcite.export.TestExportStorageConnectorProvider; import org.apache.druid.sql.calcite.external.ExternalDataSource; import org.apache.druid.sql.calcite.external.ExternalOperatorConversion; +import org.apache.druid.sql.calcite.external.HttpOperatorConversion; +import org.apache.druid.sql.calcite.external.InlineOperatorConversion; +import org.apache.druid.sql.calcite.external.LocalOperatorConversion; import org.apache.druid.sql.calcite.planner.CalciteRulesManager; import org.apache.druid.sql.calcite.planner.CatalogResolver; import org.apache.druid.sql.calcite.planner.PlannerConfig; @@ -351,6 +354,9 @@ public void configure(Binder binder) binder.install(new NestedDataModule()); NestedDataModule.registerHandlersAndSerde(); SqlBindings.addOperatorConversion(binder, ExternalOperatorConversion.class); + SqlBindings.addOperatorConversion(binder, HttpOperatorConversion.class); + SqlBindings.addOperatorConversion(binder, InlineOperatorConversion.class); + SqlBindings.addOperatorConversion(binder, LocalOperatorConversion.class); } @Override diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java index 7171a889ae0d..75bcfb86d52d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java @@ -25,6 +25,7 @@ import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.druid.sql.calcite.planner.DruidSqlIngestOperator; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -39,7 +40,7 @@ public class DruidSqlInsert extends DruidSqlIngest public static final String SQL_INSERT_SEGMENT_GRANULARITY = "sqlInsertSegmentGranularity"; // This allows reusing super.unparse - public static final SqlOperator OPERATOR = SqlInsert.OPERATOR; + public static final SqlOperator OPERATOR = DruidSqlIngestOperator.INSERT_OPERATOR; public static DruidSqlInsert create( @Nonnull SqlInsert insertNode, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java index 86f78b4d6d75..d4d7bcf6fd03 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java @@ -20,15 +20,14 @@ package org.apache.druid.sql.calcite.parser; import org.apache.calcite.sql.SqlInsert; -import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.sql.SqlSpecialOperator; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.util.ImmutableNullableList; +import org.apache.druid.sql.calcite.planner.DruidSqlIngestOperator; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -43,7 +42,7 @@ public class DruidSqlReplace extends DruidSqlIngest { public static final String SQL_REPLACE_TIME_CHUNKS = "sqlReplaceTimeChunks"; - public static final SqlOperator OPERATOR = new SqlSpecialOperator("REPLACE", SqlKind.OTHER); + public static final SqlOperator OPERATOR = DruidSqlIngestOperator.REPLACE_OPERATOR; private final SqlNode replaceTimeQuery; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java index 2cda011848b5..00e4fe278b6a 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java @@ -86,6 +86,7 @@ public class CalcitePlanner implements Planner, ViewExpander private final @Nullable RelOptCostFactory costFactory; private final Context context; private final CalciteConnectionConfig connectionConfig; + private final DruidSqlValidator.ValidatorContext validatorContext; private final RelDataTypeSystem typeSystem; /** @@ -118,9 +119,10 @@ public class CalcitePlanner implements Planner, ViewExpander * {@link org.apache.calcite.tools.Frameworks#getPlanner} instead. */ @SuppressWarnings("method.invocation.invalid") - public CalcitePlanner(FrameworkConfig config) + public CalcitePlanner(FrameworkConfig config, DruidSqlValidator.ValidatorContext validatorContext) { this.costFactory = config.getCostFactory(); + this.validatorContext = validatorContext; this.defaultSchema = config.getDefaultSchema(); this.operatorTable = config.getOperatorTable(); this.programs = config.getPrograms(); @@ -295,11 +297,11 @@ public RelRoot rel(SqlNode sql) final SqlToRelConverter.Config config = sqlToRelConverterConfig.withTrimUnusedFields(false); final SqlToRelConverter sqlToRelConverter = - new SqlToRelConverter(this, validator, + new DruidSqlToRelConverter(this, validator, createCatalogReader(), cluster, convertletTable, config ); RelRoot root = - sqlToRelConverter.convertQuery(validatedSqlNode, false, true); + sqlToRelConverter.convertQuery(sql, false, true); root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)); final RelBuilder relBuilder = config.getRelBuilderFactory().create(cluster, null); @@ -408,7 +410,8 @@ private SqlValidator createSqlValidator(CalciteCatalogReader catalogReader) catalogReader, getTypeFactory(), validatorConfig, - context.unwrapOrThrow(PlannerContext.class) + context.unwrapOrThrow(PlannerContext.class), + validatorContext ); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java index 4b697a0d5dfa..82052c0e4770 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java @@ -119,7 +119,7 @@ public AuthResult( ) { this.frameworkConfig = frameworkConfig; - this.planner = new CalcitePlanner(frameworkConfig); + this.planner = new CalcitePlanner(frameworkConfig, new ValidatorContextImpl()); this.plannerContext = plannerContext; this.engine = engine; this.hook = hook == null ? NoOpPlannerHook.INSTANCE : hook; @@ -318,6 +318,33 @@ public PlannerHook hook() } } + public class ValidatorContextImpl implements DruidSqlValidator.ValidatorContext + { + @Override + public Map queryContextMap() + { + return plannerContext.queryContextMap(); + } + + @Override + public CatalogResolver catalog() + { + return plannerContext.getPlannerToolbox().catalogResolver(); + } + + @Override + public String druidSchemaName() + { + return plannerContext.getPlannerToolbox().druidSchemaName(); + } + + @Override + public ObjectMapper jsonMapper() + { + return plannerContext.getPlannerToolbox().jsonMapper(); + } + } + public static DruidException translateException(Exception e) { try { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java new file mode 100644 index 000000000000..90be66d84b70 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.sql.calcite.expression.AuthorizableOperator; +import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.apache.druid.sql.calcite.parser.DruidSqlReplace; +import org.apache.druid.sql.calcite.parser.SqlGranularityLiteral; + +import java.util.HashSet; +import java.util.Set; + +public class DruidSqlIngestOperator extends SqlSpecialOperator implements AuthorizableOperator +{ + public static final SqlSpecialOperator INSERT_OPERATOR = + new DruidSqlInsertOperator(); + public static final SqlSpecialOperator REPLACE_OPERATOR = + new DruidSqlReplaceOperator(); + + public static class DruidSqlInsertOperator extends DruidSqlIngestOperator + { + public DruidSqlInsertOperator() + { + super("INSERT"); + } + + @Override + public SqlCall createCall( + SqlLiteral functionQualifier, + SqlParserPos pos, + SqlNode... operands + ) + { + return new DruidSqlInsert( + pos, + // Must match SqlInsert.getOperandList() + (SqlNodeList) operands[0], + operands[1], + operands[2], + (SqlNodeList) operands[3], + // Must match DruidSqlIngest.getOperandList() + (SqlGranularityLiteral) operands[4], + (SqlNodeList) operands[5], + null // fix this + ); + } + } + + public static class DruidSqlReplaceOperator extends DruidSqlIngestOperator + { + public DruidSqlReplaceOperator() + { + super("REPLACE"); + } + + @Override + public SqlCall createCall( + SqlLiteral functionQualifier, + SqlParserPos pos, + SqlNode... operands + ) + { + return new DruidSqlReplace( + pos, + // Must match SqlInsert.getOperandList() + (SqlNodeList) operands[0], + operands[1], + operands[2], + (SqlNodeList) operands[3], + // Must match DruidSqlIngest.getOperandList() + (SqlGranularityLiteral) operands[4], + (SqlNodeList) operands[5], + // Must match DruidSqlReplace.getOperandList() + operands[6], + null // fix this + ); + } + } + + public DruidSqlIngestOperator(String name) + { + super(name, SqlKind.INSERT); + } + + @Override + public Set computeResources(SqlCall call, boolean inputSourceTypeSecurityEnabled) + { + // resource actions are computed in the respective ingest handlers. + return new HashSet<>(); + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlToRelConverter.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlToRelConverter.java new file mode 100644 index 000000000000..d5f68748337a --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlToRelConverter.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable.ViewExpander; +import org.apache.calcite.prepare.Prepare.CatalogReader; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql2rel.SqlRexConvertletTable; +import org.apache.calcite.sql2rel.SqlToRelConverter; + +public class DruidSqlToRelConverter extends SqlToRelConverter +{ + public DruidSqlToRelConverter( + final ViewExpander viewExpander, + final SqlValidator validator, + final CatalogReader catalogReader, + RelOptCluster cluster, + final SqlRexConvertletTable convertletTable, + final Config config + ) + { + super(viewExpander, validator, catalogReader, cluster, convertletTable, config); + } + + /** + * Convert a Druid {@code INSERT} or {@code REPLACE} statement. The code is the same + * as the normal conversion, except we don't actually create the final modify node. + * Druid has its own special way to handle inserts. (This should probably change in + * some future, but doing so requires changes in the SQL engine and MSQ, which is a bit + * invasive.) + */ + @Override + protected RelNode convertInsert(SqlInsert call) + { + // Get the target type: the column types we want to write into the target datasource. + final RelDataType targetRowType = validator.getValidatedNodeType(call); + assert targetRowType != null; + + // Convert the underlying SELECT. We pushed the CLUSTERED BY clause into the SELECT + // as its ORDER BY. We claim this is the top query because MSQ doesn't actually + // use the Calcite insert node. + RelNode sourceRel = convertQueryRecursive(call.getSource(), true, targetRowType).project(); + + // We omit the column mapping and insert node that Calcite normally provides. + // Presumably MSQ does these its own way. + return sourceRel; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index 390ddf96bafb..895a4dd5fe2b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -19,44 +19,102 @@ package org.apache.druid.sql.calcite.planner; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.prepare.BaseDruidSqlValidator; import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelRecordType; import org.apache.calcite.runtime.CalciteContextException; import org.apache.calcite.runtime.CalciteException; +import org.apache.calcite.sql.SqlBasicCall; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlNumericLiteral; import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.SqlOrderBy; +import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.SqlUtil; import org.apache.calcite.sql.SqlWindow; +import org.apache.calcite.sql.SqlWith; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.BasicSqlType; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.validate.IdentifierNamespace; +import org.apache.calcite.sql.validate.SqlValidatorException; +import org.apache.calcite.sql.validate.SqlValidatorImpl; +import org.apache.calcite.sql.validate.SqlValidatorNamespace; import org.apache.calcite.sql.validate.SqlValidatorScope; +import org.apache.calcite.sql.validate.SqlValidatorTable; +import org.apache.calcite.util.Pair; import org.apache.calcite.util.Util; +import org.apache.commons.lang.reflect.FieldUtils; +import org.apache.druid.catalog.model.Columns; +import org.apache.druid.catalog.model.facade.DatasourceFacade; +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.error.InvalidSqlInput; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.query.QueryContexts; +import org.apache.druid.sql.calcite.parser.DruidSqlIngest; +import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils; +import org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier; import org.apache.druid.sql.calcite.run.EngineFeature; +import org.apache.druid.sql.calcite.table.DatasourceTable; import org.checkerframework.checker.nullness.qual.Nullable; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.regex.Pattern; + /** * Druid extended SQL validator. (At present, it doesn't actually * have any extensions yet, but it will soon.) */ class DruidSqlValidator extends BaseDruidSqlValidator { + private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE); + + // Copied here from MSQE since that extension is not visible here. + public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment"; + + public interface ValidatorContext + { + Map queryContextMap(); + CatalogResolver catalog(); + String druidSchemaName(); + ObjectMapper jsonMapper(); + } + private final PlannerContext plannerContext; + private final ValidatorContext validatorContext; protected DruidSqlValidator( SqlOperatorTable opTab, CalciteCatalogReader catalogReader, JavaTypeFactory typeFactory, Config validatorConfig, - PlannerContext plannerContext + PlannerContext plannerContext, + final ValidatorContext validatorContext ) { super(opTab, catalogReader, typeFactory, validatorConfig); this.plannerContext = plannerContext; + this.validatorContext = validatorContext; } @Override @@ -113,6 +171,631 @@ public void validateWindow(SqlNode windowOrId, SqlValidatorScope scope, @Nullabl super.validateWindow(windowOrId, scope, call); } + @Override + public void validateInsert(final SqlInsert insert) + { + final DruidSqlIngest ingestNode = (DruidSqlIngest) insert; + if (insert.isUpsert()) { + throw InvalidSqlInput.exception("UPSERT is not supported."); + } + + + // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported. + final String operationName = insert.getOperator().getName(); + if (insert.getTargetColumnList() != null) { + throw InvalidSqlInput.exception( + "Operation [%s] cannot be run with a target column list, given [%s (%s)]", + operationName, + ingestNode.getTargetTable(), ingestNode.getTargetColumnList() + ); + } + + // The target namespace is both the target table ID and the row type for that table. + final SqlValidatorNamespace targetNamespace = getNamespace(insert); + final IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace; + // The target is a new or existing datasource. + final DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName); + + // An existing datasource may have metadata. + final DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata(); + + // Validate segment granularity, which depends on nothing else. + if (!(ingestNode.getTargetTable() instanceof ExternalDestinationSqlIdentifier)) { + validateSegmentGranularity(operationName, ingestNode, tableMetadata); + } + + // The source must be a SELECT + final SqlNode source = insert.getSource(); + // why? ensureNoOrderBy(source, operationName); + + // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause + //final SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata); + rewriteClusteringToOrderBy(source, ingestNode, null); + + // Validate the source statement. Validates the ORDER BY pushed down in the above step. + // Because of the non-standard Druid semantics, we can't define the target type: we don't know + // the target columns yet, and we can't infer types when they must come from the SELECT. + // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT + // usually defines the target types, unless the catalog says otherwise. Since catalog entries + // are optional, we don't know the target type until we validate the SELECT. (Also, we won't + // know names and we match by name.) Thus, we'd have to validate (to know names and types) + // to get the target types, but we need the target types to validate. Catch-22. So, we punt. + final SqlValidatorScope scope; + if (source instanceof SqlSelect) { + final SqlSelect sqlSelect = (SqlSelect) source; + validateSelect(sqlSelect, unknownType); + scope = null; + } else { + scope = scopes.get(source); + validateQuery(source, scope, unknownType); + } + + final SqlValidatorNamespace sourceNamespace = namespaces.get(source); + final RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType(); + + // Validate the __time column + int timeColumnIndex = sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN); + if (timeColumnIndex != -1) { + validateTimeColumn(sourceType, timeColumnIndex); + } + + // Validate clustering against the SELECT row type. Clustering has additional + // constraints beyond what was validated for the pushed-down ORDER BY. + // Though we pushed down clustering above, only now can we validate it after + // we've determined the SELECT row type. + //validateClustering(sourceType, ingestNode, catalogClustering); + + // Determine the output (target) schema. + final RelDataType targetType = validateTargetType(scope, insertNs, insert, sourceType, tableMetadata); + + // Set the type for the INSERT/REPLACE node + setValidatedNodeType(insert, targetType); + + // Segment size + if (tableMetadata != null && !validatorContext.queryContextMap().containsKey(CTX_ROWS_PER_SEGMENT)) { + final Integer targetSegmentRows = tableMetadata.targetSegmentRows(); + if (targetSegmentRows != null) { + validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows); + } + } + } + + /** + * Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource, + * or insert into an existing one. If the target exists, it must be a datasource. If it + * does not exist, the target must be in the datasource schema, normally "druid". + */ + private DatasourceTable validateInsertTarget( + final SqlValidatorNamespace targetNamespace, + final IdentifierNamespace insertNs, + final String operationName + ) + { + // Get the target table ID + final SqlIdentifier destId = insertNs.getId(); + if (destId.names.isEmpty()) { + // I don't think this can happen, but include a branch for it just in case. + throw InvalidSqlInput.exception("%s requires a target table.", operationName); + } + + // Druid does not support 3+ part names. + final int n = destId.names.size(); + if (n > 2) { + throw InvalidSqlInput.exception("Druid does not support 3+ part names: [%s]", destId, operationName); + } + String tableName = destId.names.get(n - 1); + + // If this is a 2-part name, the first part must be the datasource schema. + if (n == 2 && !validatorContext.druidSchemaName().equals(destId.names.get(0))) { + throw InvalidSqlInput.exception( + "Table [%s] does not support operation [%s] because it is not a Druid datasource", + destId, + operationName + ); + } + try { + // Try to resolve the table. Will fail if this is an INSERT into a new table. + validateNamespace(targetNamespace, unknownType); + SqlValidatorTable target = insertNs.resolve().getTable(); + try { + return target.unwrap(DatasourceTable.class); + } + catch (Exception e) { + throw InvalidSqlInput.exception( + "Table [%s] does not support operation [%s] because it is not a Druid datasource", + destId, + operationName + ); + } + } + catch (CalciteContextException e) { + // Something failed. Let's make sure it was the table lookup. + // The check is kind of a hack, but its the best we can do given that Calcite + // didn't expect this non-SQL use case. + if (e.getCause() instanceof SqlValidatorException && e.getMessage() + .contains(StringUtils.format("Object '%s' not found", tableName))) { + // The catalog implementation may be "strict": and require that the target + // table already exists, rather than the default "lenient" mode that can + // create a new table. + if (validatorContext.catalog().ingestRequiresExistingTable()) { + throw InvalidSqlInput.exception("Cannot %s into [%s] because it does not exist", operationName, destId); + } + // New table. Validate the shape of the name. + IdUtils.validateId("table", tableName); + return null; + } + throw e; + } + } + + private void validateSegmentGranularity( + final String operationName, + final DruidSqlIngest ingestNode, + final DatasourceFacade tableMetadata + ) + { + final Granularity definedGranularity = tableMetadata == null ? null : tableMetadata.segmentGranularity(); + if (definedGranularity != null) { + // Should already have been checked when creating the catalog entry + DruidSqlParserUtils.validateSupportedGranularityForPartitionedBy(null, definedGranularity); + } + final Granularity ingestionGranularity = ingestNode.getPartitionedBy().getGranularity(); + if (ingestionGranularity != null) { + DruidSqlParserUtils.validateSupportedGranularityForPartitionedBy(ingestNode, ingestionGranularity); + } + final Granularity finalGranularity; + if (definedGranularity == null) { + // The catalog has no granularity: apply the query value + if (ingestionGranularity == null) { + // Neither have a value: error + throw InvalidSqlInput.exception( + "Operation [%s] requires a PARTITIONED BY to be explicitly defined, but none was found.", + operationName + ); + } else { + finalGranularity = ingestionGranularity; + } + } else { + // The catalog has a granularity + if (ingestionGranularity == null) { + // The query has no granularity: just apply the catalog granularity. + finalGranularity = definedGranularity; + } else if (definedGranularity.equals(ingestionGranularity)) { + // Both have a setting and they are the same. We assume this would + // likely occur only when moving to the catalog, and old queries still + // contain the PARTITION BY clause. + finalGranularity = definedGranularity; + } else { + // Both have a setting but they are different. Since the user declared + // the grain, using a different one is an error. If the user wants to + // vary the grain across different (re)ingestions, then, at present, don't + // declare the grain in the catalog. + // TODO: allow mismatch + throw InvalidSqlInput.exception( + "PARTITIONED BY mismatch. Catalog: [%s], query: [%s]", + granularityToSqlString(definedGranularity), + granularityToSqlString(ingestionGranularity) + ); + } + } + + // Note: though this is the validator, we cheat a bit and write the target + // granularity into the query context. Perhaps this step should be done + // during conversion, however, we've just worked out the granularity, so we + // do it here instead. + try { + validatorContext.queryContextMap().put( + DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, + validatorContext.jsonMapper().writeValueAsString(finalGranularity) + ); + } + catch (JsonProcessingException e) { + throw InvalidSqlInput.exception(e, "Invalid partition granularity [%s]", finalGranularity); + } + } + + private String granularityToSqlString(final Granularity gran) + { + if (gran == null) { + return "NULL"; + } + // The validation path will only ever see the ALL granularity or + // a period granularity. Neither the parser nor catalog can + // create a Duration granularity. + if (Granularities.ALL == gran) { + return "ALL TIME"; + } + return ((PeriodGranularity) gran).getPeriod().toString(); + } + + private void ensureNoOrderBy( + SqlNode source, + final String operationName + ) + { + // The source SELECT cannot include an ORDER BY clause. Ordering is given + // by the CLUSTERED BY clause, if any. + // Check that an ORDER BY clause is not provided by the underlying query + SqlNodeList orderByList; + if (source instanceof SqlOrderBy) { + throw InvalidSqlInput.exception( + "Cannot use an ORDER BY clause on a Query of type [%s], use CLUSTERED BY instead", + operationName + ); + } + + // Pull the SELECT statement out of the WITH clause + if (source instanceof SqlWith) { + source = ((SqlWith) source).getOperandList().get(1); + } + // If the child of INSERT or WITH is not SELECT, then the statement is not valid. + if (!(source instanceof SqlSelect)) { + throw InvalidSqlInput.exception( + "%s is not supported within %s %s statement.", + source.getKind(), + statementArticle(operationName), + operationName + ); + } + + // Verify that the SELECT has no ORDER BY clause + SqlSelect select = (SqlSelect) source; + orderByList = select.getOrderList(); + if (orderByList != null && orderByList.size() != 0) { + throw InvalidSqlInput.exception( + "Cannot use an ORDER BY clause on a Query of type [%s], use CLUSTERED BY instead", + operationName + ); + } + } + + private String statementArticle(final String operationName) + { + return "INSERT".equals(operationName) ? "an" : "a"; + } + + + + // This part is a bit sad. By the time we get here, the validator will have created + // the ORDER BY namespace if we had a real ORDER BY. We have to "catch up" and do the + // work that registerQuery() should have done. That's kind of OK. But, the orderScopes + // variable is private, so we have to play dirty tricks to get at it. + // + // Warning: this may no longer work if Java forbids access to private fields in a + // future release. + private static final Field ORDER_SCOPES_FIELD; + + static { + try { + // TODO: this class has changed, and the orderScopes field no longer exists. + ORDER_SCOPES_FIELD = FieldUtils.getDeclaredField( + SqlValidatorImpl.class, + "scopes", + true + ); + } + catch (RuntimeException e) { + throw new ISE(e, "SqlValidatorImpl.scopes is not accessible"); + } + } + + + + /** + * Clustering is a kind of ordering. We push the CLUSTERED BY clause into the source query as + * an ORDER BY clause. In an ideal world, clustering would be outside of SELECT: it is an operation + * applied after the data is selected. For now, we push it into the SELECT, then MSQ pulls it back + * out. This is unfortunate as errors will be attributed to ORDER BY, which the user did not + * actually specify (it is an error to do so.) However, with the current hybrid structure, it is + * not possible to add the ORDER by later: doing so requires access to the order by namespace + * which is not visible to subclasses. + */ + private void rewriteClusteringToOrderBy(SqlNode source, DruidSqlIngest ingestNode, SqlNodeList catalogClustering) + { + /* + SqlNodeList clusteredBy = ingestNode.getClusteredBy(); + if (clusteredBy == null || clusteredBy.getList().isEmpty()) { + if (catalogClustering == null || catalogClustering.getList().isEmpty()) { + return; + } + clusteredBy = catalogClustering; + } + while (source instanceof SqlWith) { + source = ((SqlWith) source).getOperandList().get(1); + } + final SqlSelect select = (SqlSelect) source; + + select.setOrderBy(clusteredBy); + final OrderByScope orderScope = ValidatorShim.newOrderByScope(scopes.get(select), clusteredBy, select); + try { + @SuppressWarnings("unchecked") + final Map orderScopes = + (Map) ORDER_SCOPES_FIELD.get(this); + orderScopes.put(select, orderScope); + } + catch (Exception e) { + throw new ISE(e, "orderScopes is not accessible"); + } + + */ + } + + private void validateTimeColumn( + final RelRecordType sourceType, + final int timeColumnIndex + ) + { + final RelDataTypeField timeCol = sourceType.getFieldList().get(timeColumnIndex); + final RelDataType timeColType = timeCol.getType(); + if (timeColType instanceof BasicSqlType) { + final BasicSqlType timeColSqlType = (BasicSqlType) timeColType; + final SqlTypeName timeColSqlTypeName = timeColSqlType.getSqlTypeName(); + if (timeColSqlTypeName == SqlTypeName.BIGINT || timeColSqlTypeName == SqlTypeName.TIMESTAMP) { + return; + } + } + throw InvalidSqlInput.exception( + "Column [%s] is being used as the time column. It must be of type BIGINT or TIMESTAMP, got [%s]", + timeCol.getName(), + timeColType + ); + } + + /** + * Verify clustering which can come from the query, the catalog or both. If both, + * the two must match. In either case, the cluster keys must be present in the SELECT + * clause. The {@code __time} column cannot be included. + */ + private void validateClustering( + final RelRecordType sourceType, + final DruidSqlIngest ingestNode, + final SqlNodeList catalogClustering + ) + { + final SqlNodeList clusteredBy = ingestNode.getClusteredBy(); + + // Validate both the catalog and query definitions if present. This ensures + // that things are sane if we later check that the two are identical. + if (clusteredBy != null) { + validateClusteredBy(sourceType, clusteredBy); + } + if (catalogClustering != null) { + // Catalog defines the key columns. Verify that they are present in the query. + validateClusteredBy(sourceType, catalogClustering); + } + if (clusteredBy != null && catalogClustering != null) { + // Both the query and catalog have keys. + verifyQueryClusterByMatchesCatalog(sourceType, catalogClustering, clusteredBy); + } + } + + /** + * Validate the CLUSTERED BY list. Members can be any of the following: + *

+ * {@code CLUSTERED BY [ | | ] DESC?} + *

+ * Ensure that each id exists. Ensure each column is included only once. + * For an expression, just ensure it is valid; we don't check for duplicates. + */ + private void validateClusteredBy( + final RelRecordType sourceType, + final SqlNodeList clusteredBy + ) + { + // Keep track of fields which have been referenced. + final List fieldNames = sourceType.getFieldNames(); + final int fieldCount = fieldNames.size(); + final boolean[] refs = new boolean[fieldCount]; + + // Process cluster keys + for (SqlNode clusterKey : clusteredBy) { + final Pair key = resolveClusterKey(clusterKey, fieldNames); + // If an expression, index is null. Validation was done in the ORDER BY check. + // Else, do additional MSQ-specific checks. + if (key != null) { + int index = key.left; + // No duplicate references + if (refs[index]) { + throw InvalidSqlInput.exception("Duplicate CLUSTERED BY key: [%s]", clusterKey); + } + refs[index] = true; + } + } + } + + private Pair resolveClusterKey(SqlNode clusterKey, final List fieldNames) + { + boolean desc = false; + + // Check if the key is compound: only occurs for DESC. The ASC + // case is abstracted away by the parser. + if (clusterKey instanceof SqlBasicCall) { + SqlBasicCall basicCall = (SqlBasicCall) clusterKey; + if (basicCall.getOperator() == SqlStdOperatorTable.DESC) { + // Cluster key is compound: CLUSTERED BY foo DESC + // We check only the first element + clusterKey = ((SqlBasicCall) clusterKey).getOperandList().get(0); + desc = true; + } + } + + // We now have the actual key. Handle the three cases. + if (clusterKey instanceof SqlNumericLiteral) { + // Key is an ordinal: CLUSTERED BY 2 + // Ordinals are 1-based. + final int ord = ((SqlNumericLiteral) clusterKey).intValue(true); + final int index = ord - 1; + + // The ordinal has to be in range. + if (index < 0 || fieldNames.size() <= index) { + throw InvalidSqlInput.exception( + "CLUSTERED BY ordinal [%d] should be non-negative and <= the number of fields [%d]", + ord, + fieldNames.size() + ); + } + return new Pair<>(index, desc); + } else if (clusterKey instanceof SqlIdentifier) { + // Key is an identifier: CLUSTERED BY foo + final SqlIdentifier key = (SqlIdentifier) clusterKey; + + // Only key of the form foo are allowed, not foo.bar + if (!key.isSimple()) { + throw InvalidSqlInput.exception("CLUSTERED BY keys must be a simple name with no dots: [%s]", key.toString()); + } + + // The name must match an item in the select list + final String keyName = key.names.get(0); + // Slow linear search. We assume that there are not many cluster keys. + final int index = fieldNames.indexOf(keyName); + if (index == -1) { + throw InvalidSqlInput.exception("Unknown column [%s] in CLUSTERED BY", keyName); + } + return new Pair<>(index, desc); + } else { + // Key is an expression: CLUSTERED BY CEIL(m2) + return null; + } + } + + /** + * Both the catalog and query define clustering. This is allowed as long as they + * are identical. + */ + private void verifyQueryClusterByMatchesCatalog( + final RelRecordType sourceType, + final SqlNodeList catalogClustering, + final SqlNodeList clusteredBy + ) + { + if (clusteredBy.size() != catalogClustering.size()) { + throw clusterKeyMismatchException(catalogClustering, clusteredBy); + } + final List fieldNames = sourceType.getFieldNames(); + for (int i = 0; i < clusteredBy.size(); i++) { + final SqlNode catalogKey = catalogClustering.get(i); + final SqlNode clusterKey = clusteredBy.get(i); + final Pair catalogPair = resolveClusterKey(catalogKey, fieldNames); + final Pair queryPair = resolveClusterKey(clusterKey, fieldNames); + + // Cluster keys in the catalog must be field references. If unresolved, + // we would have gotten an error above. Here we make sure that both + // indexes are the same. Since the catalog index can't be null, we're + // essentially checking that the indexes are the same: they name the same + // column. + if (!Objects.equals(catalogPair, queryPair)) { + throw clusterKeyMismatchException(catalogClustering, clusteredBy); + } + } + } + + private RuntimeException clusterKeyMismatchException(SqlNodeList catalogClustering, SqlNodeList clusterKeys) + { + throw InvalidSqlInput.exception( + "CLUSTER BY mismatch. Catalog: [%s], query: [%s]", + catalogClustering, + clusterKeys + ); + } + + /** + * Compute and validate the target type. In normal SQL, the engine would insert + * a project operator after the SELECT before the write to cast columns from the + * input type to the (compatible) defined output type. Druid doesn't work that way. + * In MSQ, the output the just is the input type. If the user wants to control the + * output type, then the user must manually insert any required CAST: Druid is not + * in the business of changing the type to suit the catalog. + *

+ * As a result, we first propagate column names and types using Druid rules: the + * output is exactly what SELECT says it is. We then apply restrictions from the + * catalog. If the table is strict, only column names from the catalog can be + * used. + */ + private RelDataType validateTargetType( + SqlValidatorScope scope, + final IdentifierNamespace insertNs, + SqlInsert insert, + RelRecordType sourceType, + DatasourceFacade tableMetadata + ) + { + final List sourceFields = sourceType.getFieldList(); + for (int i = 0; i < sourceFields.size(); i++) { + final RelDataTypeField sourceField = sourceFields.get(i); + // Check that there are no unnamed columns in the insert. + if (UNNAMED_COLUMN_PATTERN.matcher(sourceField.getName()).matches()) { + throw InvalidSqlInput.exception( + "Insertion requires columns to be named, but at least one of the columns was unnamed. This is usually " + + "the result of applying a function without having an AS clause, please ensure that all function calls" + + "are named with an AS clause as in \"func(X) as myColumn\"." + ); + } + } + if (tableMetadata == null) { + return sourceType; + } + final boolean isStrict = tableMetadata.isSealed(); + final List> fields = new ArrayList<>(); + for (RelDataTypeField sourceField : sourceFields) { + final String colName = sourceField.getName(); + final DatasourceFacade.ColumnFacade definedCol = tableMetadata.column(colName); + if (definedCol == null) { + // No catalog definition for this column. + if (isStrict) { + // Table is strict: cannot add new columns at ingest time. + throw InvalidSqlInput.exception( + "Column [%s] is not defined in the target table [%s] strict schema", + colName, + insert.getTargetTable() + ); + } + + // Table is not strict: add a new column based on the SELECT column. + fields.add(Pair.of(colName, sourceField.getType())); + continue; + } + + // If the column name is defined, but no type is given then, use the + // column type from SELECT. + if (!definedCol.hasType()) { + fields.add(Pair.of(colName, sourceField.getType())); + continue; + } + + // Both the column name and type are provided. Use the name and type + // from the catalog. + // Note to future readers: this check is preliminary. It works for the + // simple column types and has not yet been extended to complex types, aggregates, + // types defined in extensions, etc. It may be that SQL + // has types that Druid cannot store. This may crop up with types defined in + // extensions which are not loaded. Those details are not known at the time + // of this code so we are not yet in a position to make the right decision. + // This is a task to be revisited when we have more information. + final String sqlTypeName = definedCol.sqlStorageType(); + if (sqlTypeName == null) { + // Don't know the storage type. Just skip this one: Druid types are + // fluid so let Druid sort out what to store. This is probably not the right + // answer, but should avoid problems until full type system support is completed. + fields.add(Pair.of(colName, sourceField.getType())); + continue; + } + RelDataType relType = typeFactory.createSqlType(SqlTypeName.get(sqlTypeName)); + fields.add(Pair.of( + colName, + typeFactory.createTypeWithNullability(relType, true) + )); + } + + // Perform the SQL-standard check: that the SELECT column can be + // converted to the target type. This check is retained to mimic SQL + // behavior, but doesn't do anything because we enforced exact type + // matches above. + final RelDataType targetType = typeFactory.createStructType(fields); + final SqlValidatorTable target = insertNs.resolve().getTable(); + checkTypeAssignment(scope, target, sourceType, targetType, insert); + return targetType; + } + private boolean isPrecedingOrFollowing(@Nullable SqlNode bound) { if (bound == null) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java index 0448a7245f82..3bd4908a3955 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java @@ -32,6 +32,8 @@ import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlOrderBy; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.tools.ValidationException; import org.apache.calcite.util.Pair; import org.apache.druid.common.utils.IdUtils; @@ -61,44 +63,20 @@ public abstract class IngestHandler extends QueryHandler { private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE); - protected final Granularity ingestionGranularity; + protected Granularity ingestionGranularity; protected IngestDestination targetDatasource; + private SqlNode validatedQueryNode; + private RelDataType targetType; + IngestHandler( HandlerContext handlerContext, - DruidSqlIngest ingestNode, - SqlNode queryNode, SqlExplain explain ) { - super(handlerContext, queryNode, explain); - ingestionGranularity = ingestNode.getPartitionedBy() != null ? ingestNode.getPartitionedBy().getGranularity() : null; - handlerContext.hook().captureInsert(ingestNode); - } - - protected static SqlNode convertQuery(DruidSqlIngest sqlNode) - { - SqlNode query = sqlNode.getSource(); - - // Check if ORDER BY clause is not provided to the underlying query - if (query instanceof SqlOrderBy) { - SqlOrderBy sqlOrderBy = (SqlOrderBy) query; - SqlNodeList orderByList = sqlOrderBy.orderList; - if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) { - throw InvalidSqlInput.exception( - "Cannot use an ORDER BY clause on a Query of type [%s], use CLUSTERED BY instead", - sqlNode.getOperator().getName() - ); - } - } - if (sqlNode.getClusteredBy() != null) { - query = DruidSqlParserUtils.convertClusterByToOrderBy(query, sqlNode.getClusteredBy()); - } - - if (!query.isA(SqlKind.QUERY)) { - throw InvalidSqlInput.exception("Unexpected SQL statement type [%s], expected it to be a QUERY", query.getKind()); - } - return query; + super(handlerContext, explain); + //ingestionGranularity = ingestNode.getPartitionedBy() != null ? ingestNode.getPartitionedBy().getGranularity() : null; + //handlerContext.hook().captureInsert(ingestNode); } protected String operationName() @@ -170,7 +148,6 @@ public void validate() catch (JsonProcessingException e) { throw InvalidSqlInput.exception(e, "Invalid partition granularity [%s]", ingestionGranularity); } - super.validate(); // Check if CTX_SQL_OUTER_LIMIT is specified and fail the query if it is. CTX_SQL_OUTER_LIMIT being provided causes // the number of rows inserted to be limited which is likely to be confusing and unintended. if (handlerContext.queryContextMap().get(PlannerContext.CTX_SQL_OUTER_LIMIT) != null) { @@ -180,9 +157,24 @@ public void validate() operationName() ); } + DruidSqlIngest ingestNode = ingestNode(); + DruidSqlIngest validatedNode = (DruidSqlIngest) validate(ingestNode); + validatedQueryNode = validatedNode.getSource(); + CalcitePlanner planner = handlerContext.planner(); + final SqlValidator validator = planner.getValidator(); + targetType = validator.getValidatedNodeType(validatedNode); + ingestionGranularity = ingestNode().getPartitionedBy() != null + ? ingestNode().getPartitionedBy().getGranularity() + : null; targetDatasource = validateAndGetDataSourceForIngest(); } + @Override + protected SqlNode validatedQueryNode() + { + return validatedQueryNode; + } + @Override protected RelDataType returnedRowType() { @@ -299,13 +291,42 @@ public InsertHandler( SqlExplain explain ) { - super( - handlerContext, - sqlNode, - convertQuery(sqlNode), - explain - ); - this.sqlNode = sqlNode; + super(handlerContext, explain); + this.sqlNode = convertQuery(sqlNode); + handlerContext.hook().captureInsert(sqlNode); + } + + protected static DruidSqlInsert convertQuery(DruidSqlIngest sqlNode) + { + SqlNode query = sqlNode.getSource(); + + // Check if ORDER BY clause is not provided to the underlying query + if (query instanceof SqlOrderBy) { + SqlOrderBy sqlOrderBy = (SqlOrderBy) query; + SqlNodeList orderByList = sqlOrderBy.orderList; + if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) { + throw InvalidSqlInput.exception( + "Cannot use an ORDER BY clause on a Query of type [%s], use CLUSTERED BY instead", + sqlNode.getOperator().getName() + ); + } + } + if (sqlNode.getClusteredBy() != null) { + query = DruidSqlParserUtils.convertClusterByToOrderBy(query, sqlNode.getClusteredBy()); + } + + if (!query.isA(SqlKind.QUERY)) { + throw InvalidSqlInput.exception("Unexpected SQL statement type [%s], expected it to be a QUERY", query.getKind()); + } + return DruidSqlInsert.create(new SqlInsert( + sqlNode.getParserPosition(), + (SqlNodeList) sqlNode.getOperandList().get(0), + sqlNode.getOperandList().get(1), + query, + (SqlNodeList) sqlNode.getOperandList().get(3)), + sqlNode.getPartitionedBy(), + sqlNode.getClusteredBy(), + sqlNode.getExportFileFormat()); } @Override @@ -355,11 +376,48 @@ public ReplaceHandler( { super( handlerContext, - sqlNode, - convertQuery(sqlNode), explain ); - this.sqlNode = sqlNode; + this.sqlNode = convertQuery(sqlNode); + handlerContext.hook().captureInsert(sqlNode); + } + + protected static DruidSqlReplace convertQuery(DruidSqlReplace sqlNode) + { + SqlNode query = sqlNode.getSource(); + + // Check if ORDER BY clause is not provided to the underlying query + if (query instanceof SqlOrderBy) { + SqlOrderBy sqlOrderBy = (SqlOrderBy) query; + SqlNodeList orderByList = sqlOrderBy.orderList; + if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) { + throw InvalidSqlInput.exception( + "Cannot use an ORDER BY clause on a Query of type [%s], use CLUSTERED BY instead", + sqlNode.getOperator().getName() + ); + } + } + if (sqlNode.getClusteredBy() != null) { + query = DruidSqlParserUtils.convertClusterByToOrderBy(query, sqlNode.getClusteredBy()); + sqlNode.setSource((SqlSelect) (((SqlOrderBy) query).query)); + } + + if (!query.isA(SqlKind.QUERY)) { + throw InvalidSqlInput.exception("Unexpected SQL statement type [%s], expected it to be a QUERY", query.getKind()); + } + return DruidSqlReplace.create( + new SqlInsert( + sqlNode.getParserPosition(), + (SqlNodeList) sqlNode.getOperandList().get(0), + sqlNode.getOperandList().get(1), + query, + (SqlNodeList) sqlNode.getOperandList().get(3) + ), + sqlNode.getPartitionedBy(), + sqlNode.getClusteredBy(), + sqlNode.getReplaceTimeQuery(), + sqlNode.getExportFileFormat() + ); } @Override @@ -390,12 +448,12 @@ public void validate() ); } + super.validate(); List replaceIntervalsList = DruidSqlParserUtils.validateQueryAndConvertToIntervals( replaceTimeQuery, ingestionGranularity, handlerContext.timeZone() ); - super.validate(); if (replaceIntervalsList != null) { replaceIntervals = String.join(",", replaceIntervalsList); handlerContext.queryContextMap().put( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java index 0a570efa32f1..86217eb59ba6 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java @@ -93,27 +93,24 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand { static final EmittingLogger log = new EmittingLogger(QueryHandler.class); - protected SqlNode queryNode; protected SqlExplain explain; - protected SqlNode validatedQueryNode; private boolean isPrepared; protected RelRoot rootQueryRel; private PrepareResult prepareResult; protected RexBuilder rexBuilder; - public QueryHandler(SqlStatementHandler.HandlerContext handlerContext, SqlNode sqlNode, SqlExplain explain) + public QueryHandler(HandlerContext handlerContext, SqlExplain explain) { super(handlerContext); - this.queryNode = sqlNode; this.explain = explain; } - @Override - public void validate() + protected SqlNode validate(SqlNode root) { CalcitePlanner planner = handlerContext.planner(); + SqlNode validatedQueryNode; try { - validatedQueryNode = planner.validate(rewriteParameters()); + validatedQueryNode = planner.validate(rewriteParameters(root)); } catch (ValidationException e) { throw DruidPlanner.translateException(e); @@ -126,9 +123,10 @@ public void validate() ); validatedQueryNode.accept(resourceCollectorShuttle); resourceActions = resourceCollectorShuttle.getResourceActions(); + return validatedQueryNode; } - private SqlNode rewriteParameters() + private SqlNode rewriteParameters(SqlNode original) { // Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any // {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral} @@ -140,9 +138,9 @@ private SqlNode rewriteParameters() // contains parameters, but no values were provided. PlannerContext plannerContext = handlerContext.plannerContext(); if (plannerContext.getParameters().isEmpty()) { - return queryNode; + return original; } else { - return queryNode.accept(new SqlParameterizerShuttle(plannerContext)); + return original.accept(new SqlParameterizerShuttle(plannerContext)); } } @@ -153,6 +151,7 @@ public void prepare() return; } isPrepared = true; + SqlNode validatedQueryNode = validatedQueryNode(); rootQueryRel = handlerContext.planner().rel(validatedQueryNode); handlerContext.hook().captureQueryRel(rootQueryRel); final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory(); @@ -177,6 +176,8 @@ public PrepareResult prepareResult() return prepareResult; } + protected abstract SqlNode validatedQueryNode(); + protected abstract RelDataType returnedRowType(); private static RelDataType getExplainStructType(RelDataTypeFactory typeFactory) @@ -712,13 +713,17 @@ private DruidException buildSQLPlanningError(RelOptPlanner.CannotPlanException e public static class SelectHandler extends QueryHandler { + private final SqlNode queryNode; + private SqlNode validatedQueryNode; + public SelectHandler( HandlerContext handlerContext, SqlNode sqlNode, SqlExplain explain ) { - super(handlerContext, sqlNode, explain); + super(handlerContext, explain); + this.queryNode = sqlNode; } @Override @@ -727,7 +732,13 @@ public void validate() if (!handlerContext.plannerContext().featureAvailable(EngineFeature.CAN_SELECT)) { throw InvalidSqlInput.exception("Cannot execute SELECT with SQL engine [%s]", handlerContext.engine().name()); } - super.validate(); + validatedQueryNode = validate(queryNode); + } + + @Override + protected SqlNode validatedQueryNode() + { + return validatedQueryNode; } @Override diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java index 515796948fa1..eded1b7fb776 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java @@ -1679,7 +1679,6 @@ public void testErrorWithUnableToConstructColumnSignatureWithExtern() + "partitioned by DAY\n" + "clustered by channel"; HashMap context = new HashMap<>(DEFAULT_CONTEXT); - context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100); testIngestionQuery().context(context).sql(sqlString) .expectValidationError( new DruidExceptionMatcher( @@ -1708,7 +1707,6 @@ public void testErrorWhenBothRowSignatureAndExtendsProvidedToExtern() + "partitioned by DAY\n" + "clustered by channel"; HashMap context = new HashMap<>(DEFAULT_CONTEXT); - context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100); testIngestionQuery().context(context).sql(sqlString) .expectValidationError( new DruidExceptionMatcher( @@ -1736,7 +1734,6 @@ public void testErrorWhenNoneOfRowSignatureAndExtendsProvidedToExtern() + "partitioned by DAY\n" + "clustered by channel"; HashMap context = new HashMap<>(DEFAULT_CONTEXT); - context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100); testIngestionQuery().context(context).sql(sqlString) .expectValidationError( new DruidExceptionMatcher( @@ -1765,7 +1762,6 @@ public void testErrorWhenInputSourceInvalid() + "partitioned by DAY\n" + "clustered by channel"; HashMap context = new HashMap<>(DEFAULT_CONTEXT); - context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100); testIngestionQuery().context(context).sql(sqlString) .expectValidationError( new DruidExceptionMatcher( From 6579bd8e5de926cc9b834737348e9ec8df98210a Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 14 Feb 2024 14:02:13 -0500 Subject: [PATCH 02/12] * remove dead code --- .../calcite/planner/DruidSqlValidator.java | 142 +----------------- 1 file changed, 3 insertions(+), 139 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index 895a4dd5fe2b..2b04b4114655 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -38,29 +38,24 @@ import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlNumericLiteral; import org.apache.calcite.sql.SqlOperatorTable; -import org.apache.calcite.sql.SqlOrderBy; import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.SqlUtil; import org.apache.calcite.sql.SqlWindow; -import org.apache.calcite.sql.SqlWith; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.type.BasicSqlType; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.validate.IdentifierNamespace; import org.apache.calcite.sql.validate.SqlValidatorException; -import org.apache.calcite.sql.validate.SqlValidatorImpl; import org.apache.calcite.sql.validate.SqlValidatorNamespace; import org.apache.calcite.sql.validate.SqlValidatorScope; import org.apache.calcite.sql.validate.SqlValidatorTable; import org.apache.calcite.util.Pair; import org.apache.calcite.util.Util; -import org.apache.commons.lang.reflect.FieldUtils; import org.apache.druid.catalog.model.Columns; import org.apache.druid.catalog.model.facade.DatasourceFacade; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.error.InvalidSqlInput; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; @@ -74,7 +69,6 @@ import org.apache.druid.sql.calcite.table.DatasourceTable; import org.checkerframework.checker.nullness.qual.Nullable; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -206,13 +200,8 @@ public void validateInsert(final SqlInsert insert) // The source must be a SELECT final SqlNode source = insert.getSource(); - // why? ensureNoOrderBy(source, operationName); - // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause - //final SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata); - rewriteClusteringToOrderBy(source, ingestNode, null); - - // Validate the source statement. Validates the ORDER BY pushed down in the above step. + // Validate the source statement. // Because of the non-standard Druid semantics, we can't define the target type: we don't know // the target columns yet, and we can't infer types when they must come from the SELECT. // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT @@ -239,11 +228,7 @@ public void validateInsert(final SqlInsert insert) validateTimeColumn(sourceType, timeColumnIndex); } - // Validate clustering against the SELECT row type. Clustering has additional - // constraints beyond what was validated for the pushed-down ORDER BY. - // Though we pushed down clustering above, only now can we validate it after - // we've determined the SELECT row type. - //validateClustering(sourceType, ingestNode, catalogClustering); + validateClustering(sourceType, ingestNode); // Determine the output (target) schema. final RelDataType targetType = validateTargetType(scope, insertNs, insert, sourceType, tableMetadata); @@ -408,118 +393,6 @@ private String granularityToSqlString(final Granularity gran) return ((PeriodGranularity) gran).getPeriod().toString(); } - private void ensureNoOrderBy( - SqlNode source, - final String operationName - ) - { - // The source SELECT cannot include an ORDER BY clause. Ordering is given - // by the CLUSTERED BY clause, if any. - // Check that an ORDER BY clause is not provided by the underlying query - SqlNodeList orderByList; - if (source instanceof SqlOrderBy) { - throw InvalidSqlInput.exception( - "Cannot use an ORDER BY clause on a Query of type [%s], use CLUSTERED BY instead", - operationName - ); - } - - // Pull the SELECT statement out of the WITH clause - if (source instanceof SqlWith) { - source = ((SqlWith) source).getOperandList().get(1); - } - // If the child of INSERT or WITH is not SELECT, then the statement is not valid. - if (!(source instanceof SqlSelect)) { - throw InvalidSqlInput.exception( - "%s is not supported within %s %s statement.", - source.getKind(), - statementArticle(operationName), - operationName - ); - } - - // Verify that the SELECT has no ORDER BY clause - SqlSelect select = (SqlSelect) source; - orderByList = select.getOrderList(); - if (orderByList != null && orderByList.size() != 0) { - throw InvalidSqlInput.exception( - "Cannot use an ORDER BY clause on a Query of type [%s], use CLUSTERED BY instead", - operationName - ); - } - } - - private String statementArticle(final String operationName) - { - return "INSERT".equals(operationName) ? "an" : "a"; - } - - - - // This part is a bit sad. By the time we get here, the validator will have created - // the ORDER BY namespace if we had a real ORDER BY. We have to "catch up" and do the - // work that registerQuery() should have done. That's kind of OK. But, the orderScopes - // variable is private, so we have to play dirty tricks to get at it. - // - // Warning: this may no longer work if Java forbids access to private fields in a - // future release. - private static final Field ORDER_SCOPES_FIELD; - - static { - try { - // TODO: this class has changed, and the orderScopes field no longer exists. - ORDER_SCOPES_FIELD = FieldUtils.getDeclaredField( - SqlValidatorImpl.class, - "scopes", - true - ); - } - catch (RuntimeException e) { - throw new ISE(e, "SqlValidatorImpl.scopes is not accessible"); - } - } - - - - /** - * Clustering is a kind of ordering. We push the CLUSTERED BY clause into the source query as - * an ORDER BY clause. In an ideal world, clustering would be outside of SELECT: it is an operation - * applied after the data is selected. For now, we push it into the SELECT, then MSQ pulls it back - * out. This is unfortunate as errors will be attributed to ORDER BY, which the user did not - * actually specify (it is an error to do so.) However, with the current hybrid structure, it is - * not possible to add the ORDER by later: doing so requires access to the order by namespace - * which is not visible to subclasses. - */ - private void rewriteClusteringToOrderBy(SqlNode source, DruidSqlIngest ingestNode, SqlNodeList catalogClustering) - { - /* - SqlNodeList clusteredBy = ingestNode.getClusteredBy(); - if (clusteredBy == null || clusteredBy.getList().isEmpty()) { - if (catalogClustering == null || catalogClustering.getList().isEmpty()) { - return; - } - clusteredBy = catalogClustering; - } - while (source instanceof SqlWith) { - source = ((SqlWith) source).getOperandList().get(1); - } - final SqlSelect select = (SqlSelect) source; - - select.setOrderBy(clusteredBy); - final OrderByScope orderScope = ValidatorShim.newOrderByScope(scopes.get(select), clusteredBy, select); - try { - @SuppressWarnings("unchecked") - final Map orderScopes = - (Map) ORDER_SCOPES_FIELD.get(this); - orderScopes.put(select, orderScope); - } - catch (Exception e) { - throw new ISE(e, "orderScopes is not accessible"); - } - - */ - } - private void validateTimeColumn( final RelRecordType sourceType, final int timeColumnIndex @@ -548,8 +421,7 @@ private void validateTimeColumn( */ private void validateClustering( final RelRecordType sourceType, - final DruidSqlIngest ingestNode, - final SqlNodeList catalogClustering + final DruidSqlIngest ingestNode ) { final SqlNodeList clusteredBy = ingestNode.getClusteredBy(); @@ -559,14 +431,6 @@ private void validateClustering( if (clusteredBy != null) { validateClusteredBy(sourceType, clusteredBy); } - if (catalogClustering != null) { - // Catalog defines the key columns. Verify that they are present in the query. - validateClusteredBy(sourceType, catalogClustering); - } - if (clusteredBy != null && catalogClustering != null) { - // Both the query and catalog have keys. - verifyQueryClusterByMatchesCatalog(sourceType, catalogClustering, clusteredBy); - } } /** From e019b707fc8ea3c08a22691da08b87235ea6a69e Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 14 Feb 2024 16:38:47 -0500 Subject: [PATCH 03/12] * fix failing tests, remove some uneeded validations that are done elsewhere --- .../calcite/planner/DruidSqlValidator.java | 186 +----------------- 1 file changed, 1 insertion(+), 185 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index 2b04b4114655..dc3d0bee9f25 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -29,21 +29,16 @@ import org.apache.calcite.rel.type.RelRecordType; import org.apache.calcite.runtime.CalciteContextException; import org.apache.calcite.runtime.CalciteException; -import org.apache.calcite.sql.SqlBasicCall; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.sql.SqlNumericLiteral; import org.apache.calcite.sql.SqlOperatorTable; import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.SqlUtil; import org.apache.calcite.sql.SqlWindow; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.sql.type.BasicSqlType; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.validate.IdentifierNamespace; import org.apache.calcite.sql.validate.SqlValidatorException; @@ -52,7 +47,6 @@ import org.apache.calcite.sql.validate.SqlValidatorTable; import org.apache.calcite.util.Pair; import org.apache.calcite.util.Util; -import org.apache.druid.catalog.model.Columns; import org.apache.druid.catalog.model.facade.DatasourceFacade; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.error.InvalidSqlInput; @@ -72,7 +66,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.regex.Pattern; /** @@ -222,14 +215,6 @@ public void validateInsert(final SqlInsert insert) final SqlValidatorNamespace sourceNamespace = namespaces.get(source); final RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType(); - // Validate the __time column - int timeColumnIndex = sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN); - if (timeColumnIndex != -1) { - validateTimeColumn(sourceType, timeColumnIndex); - } - - validateClustering(sourceType, ingestNode); - // Determine the output (target) schema. final RelDataType targetType = validateTargetType(scope, insertNs, insert, sourceType, tableMetadata); @@ -393,174 +378,6 @@ private String granularityToSqlString(final Granularity gran) return ((PeriodGranularity) gran).getPeriod().toString(); } - private void validateTimeColumn( - final RelRecordType sourceType, - final int timeColumnIndex - ) - { - final RelDataTypeField timeCol = sourceType.getFieldList().get(timeColumnIndex); - final RelDataType timeColType = timeCol.getType(); - if (timeColType instanceof BasicSqlType) { - final BasicSqlType timeColSqlType = (BasicSqlType) timeColType; - final SqlTypeName timeColSqlTypeName = timeColSqlType.getSqlTypeName(); - if (timeColSqlTypeName == SqlTypeName.BIGINT || timeColSqlTypeName == SqlTypeName.TIMESTAMP) { - return; - } - } - throw InvalidSqlInput.exception( - "Column [%s] is being used as the time column. It must be of type BIGINT or TIMESTAMP, got [%s]", - timeCol.getName(), - timeColType - ); - } - - /** - * Verify clustering which can come from the query, the catalog or both. If both, - * the two must match. In either case, the cluster keys must be present in the SELECT - * clause. The {@code __time} column cannot be included. - */ - private void validateClustering( - final RelRecordType sourceType, - final DruidSqlIngest ingestNode - ) - { - final SqlNodeList clusteredBy = ingestNode.getClusteredBy(); - - // Validate both the catalog and query definitions if present. This ensures - // that things are sane if we later check that the two are identical. - if (clusteredBy != null) { - validateClusteredBy(sourceType, clusteredBy); - } - } - - /** - * Validate the CLUSTERED BY list. Members can be any of the following: - *

- * {@code CLUSTERED BY [ | | ] DESC?} - *

- * Ensure that each id exists. Ensure each column is included only once. - * For an expression, just ensure it is valid; we don't check for duplicates. - */ - private void validateClusteredBy( - final RelRecordType sourceType, - final SqlNodeList clusteredBy - ) - { - // Keep track of fields which have been referenced. - final List fieldNames = sourceType.getFieldNames(); - final int fieldCount = fieldNames.size(); - final boolean[] refs = new boolean[fieldCount]; - - // Process cluster keys - for (SqlNode clusterKey : clusteredBy) { - final Pair key = resolveClusterKey(clusterKey, fieldNames); - // If an expression, index is null. Validation was done in the ORDER BY check. - // Else, do additional MSQ-specific checks. - if (key != null) { - int index = key.left; - // No duplicate references - if (refs[index]) { - throw InvalidSqlInput.exception("Duplicate CLUSTERED BY key: [%s]", clusterKey); - } - refs[index] = true; - } - } - } - - private Pair resolveClusterKey(SqlNode clusterKey, final List fieldNames) - { - boolean desc = false; - - // Check if the key is compound: only occurs for DESC. The ASC - // case is abstracted away by the parser. - if (clusterKey instanceof SqlBasicCall) { - SqlBasicCall basicCall = (SqlBasicCall) clusterKey; - if (basicCall.getOperator() == SqlStdOperatorTable.DESC) { - // Cluster key is compound: CLUSTERED BY foo DESC - // We check only the first element - clusterKey = ((SqlBasicCall) clusterKey).getOperandList().get(0); - desc = true; - } - } - - // We now have the actual key. Handle the three cases. - if (clusterKey instanceof SqlNumericLiteral) { - // Key is an ordinal: CLUSTERED BY 2 - // Ordinals are 1-based. - final int ord = ((SqlNumericLiteral) clusterKey).intValue(true); - final int index = ord - 1; - - // The ordinal has to be in range. - if (index < 0 || fieldNames.size() <= index) { - throw InvalidSqlInput.exception( - "CLUSTERED BY ordinal [%d] should be non-negative and <= the number of fields [%d]", - ord, - fieldNames.size() - ); - } - return new Pair<>(index, desc); - } else if (clusterKey instanceof SqlIdentifier) { - // Key is an identifier: CLUSTERED BY foo - final SqlIdentifier key = (SqlIdentifier) clusterKey; - - // Only key of the form foo are allowed, not foo.bar - if (!key.isSimple()) { - throw InvalidSqlInput.exception("CLUSTERED BY keys must be a simple name with no dots: [%s]", key.toString()); - } - - // The name must match an item in the select list - final String keyName = key.names.get(0); - // Slow linear search. We assume that there are not many cluster keys. - final int index = fieldNames.indexOf(keyName); - if (index == -1) { - throw InvalidSqlInput.exception("Unknown column [%s] in CLUSTERED BY", keyName); - } - return new Pair<>(index, desc); - } else { - // Key is an expression: CLUSTERED BY CEIL(m2) - return null; - } - } - - /** - * Both the catalog and query define clustering. This is allowed as long as they - * are identical. - */ - private void verifyQueryClusterByMatchesCatalog( - final RelRecordType sourceType, - final SqlNodeList catalogClustering, - final SqlNodeList clusteredBy - ) - { - if (clusteredBy.size() != catalogClustering.size()) { - throw clusterKeyMismatchException(catalogClustering, clusteredBy); - } - final List fieldNames = sourceType.getFieldNames(); - for (int i = 0; i < clusteredBy.size(); i++) { - final SqlNode catalogKey = catalogClustering.get(i); - final SqlNode clusterKey = clusteredBy.get(i); - final Pair catalogPair = resolveClusterKey(catalogKey, fieldNames); - final Pair queryPair = resolveClusterKey(clusterKey, fieldNames); - - // Cluster keys in the catalog must be field references. If unresolved, - // we would have gotten an error above. Here we make sure that both - // indexes are the same. Since the catalog index can't be null, we're - // essentially checking that the indexes are the same: they name the same - // column. - if (!Objects.equals(catalogPair, queryPair)) { - throw clusterKeyMismatchException(catalogClustering, clusteredBy); - } - } - } - - private RuntimeException clusterKeyMismatchException(SqlNodeList catalogClustering, SqlNodeList clusterKeys) - { - throw InvalidSqlInput.exception( - "CLUSTER BY mismatch. Catalog: [%s], query: [%s]", - catalogClustering, - clusterKeys - ); - } /** * Compute and validate the target type. In normal SQL, the engine would insert @@ -584,8 +401,7 @@ private RelDataType validateTargetType( ) { final List sourceFields = sourceType.getFieldList(); - for (int i = 0; i < sourceFields.size(); i++) { - final RelDataTypeField sourceField = sourceFields.get(i); + for (final RelDataTypeField sourceField : sourceFields) { // Check that there are no unnamed columns in the insert. if (UNNAMED_COLUMN_PATTERN.matcher(sourceField.getName()).matches()) { throw InvalidSqlInput.exception( From 256517304623bd085da7d8531205d22e26e852b3 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 15 Feb 2024 14:31:01 -0500 Subject: [PATCH 04/12] * fix static check --- .../org/apache/druid/sql/calcite/planner/CalcitePlanner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java index 00e4fe278b6a..2b1991972425 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java @@ -285,7 +285,7 @@ public final RelNode convert(SqlNode sql) public RelRoot rel(SqlNode sql) { ensure(CalcitePlanner.State.STATE_4_VALIDATED); - SqlNode validatedSqlNode = Objects.requireNonNull( + Objects.requireNonNull( this.validatedSqlNode, "validatedSqlNode is null. Need to call #validate() first" ); From d2f0e47f25939049f65db3b5d8a5a06a158d5e58 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Fri, 16 Feb 2024 13:22:29 -0500 Subject: [PATCH 05/12] * remove uneeded ValidatorContext --- .../sql/calcite/planner/CalcitePlanner.java | 7 ++--- .../sql/calcite/planner/DruidPlanner.java | 29 +------------------ .../calcite/planner/DruidSqlValidator.java | 17 +++++------ 3 files changed, 10 insertions(+), 43 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java index 2b1991972425..f2d0408b491d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java @@ -86,7 +86,6 @@ public class CalcitePlanner implements Planner, ViewExpander private final @Nullable RelOptCostFactory costFactory; private final Context context; private final CalciteConnectionConfig connectionConfig; - private final DruidSqlValidator.ValidatorContext validatorContext; private final RelDataTypeSystem typeSystem; /** @@ -119,10 +118,9 @@ public class CalcitePlanner implements Planner, ViewExpander * {@link org.apache.calcite.tools.Frameworks#getPlanner} instead. */ @SuppressWarnings("method.invocation.invalid") - public CalcitePlanner(FrameworkConfig config, DruidSqlValidator.ValidatorContext validatorContext) + public CalcitePlanner(FrameworkConfig config) { this.costFactory = config.getCostFactory(); - this.validatorContext = validatorContext; this.defaultSchema = config.getDefaultSchema(); this.operatorTable = config.getOperatorTable(); this.programs = config.getPrograms(); @@ -410,8 +408,7 @@ private SqlValidator createSqlValidator(CalciteCatalogReader catalogReader) catalogReader, getTypeFactory(), validatorConfig, - context.unwrapOrThrow(PlannerContext.class), - validatorContext + context.unwrapOrThrow(PlannerContext.class) ); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java index 82052c0e4770..4b697a0d5dfa 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java @@ -119,7 +119,7 @@ public AuthResult( ) { this.frameworkConfig = frameworkConfig; - this.planner = new CalcitePlanner(frameworkConfig, new ValidatorContextImpl()); + this.planner = new CalcitePlanner(frameworkConfig); this.plannerContext = plannerContext; this.engine = engine; this.hook = hook == null ? NoOpPlannerHook.INSTANCE : hook; @@ -318,33 +318,6 @@ public PlannerHook hook() } } - public class ValidatorContextImpl implements DruidSqlValidator.ValidatorContext - { - @Override - public Map queryContextMap() - { - return plannerContext.queryContextMap(); - } - - @Override - public CatalogResolver catalog() - { - return plannerContext.getPlannerToolbox().catalogResolver(); - } - - @Override - public String druidSchemaName() - { - return plannerContext.getPlannerToolbox().druidSchemaName(); - } - - @Override - public ObjectMapper jsonMapper() - { - return plannerContext.getPlannerToolbox().jsonMapper(); - } - } - public static DruidException translateException(Exception e) { try { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index dc3d0bee9f25..d4a7eddd7986 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -88,20 +88,17 @@ public interface ValidatorContext } private final PlannerContext plannerContext; - private final ValidatorContext validatorContext; protected DruidSqlValidator( SqlOperatorTable opTab, CalciteCatalogReader catalogReader, JavaTypeFactory typeFactory, Config validatorConfig, - PlannerContext plannerContext, - final ValidatorContext validatorContext + PlannerContext plannerContext ) { super(opTab, catalogReader, typeFactory, validatorConfig); this.plannerContext = plannerContext; - this.validatorContext = validatorContext; } @Override @@ -222,10 +219,10 @@ public void validateInsert(final SqlInsert insert) setValidatedNodeType(insert, targetType); // Segment size - if (tableMetadata != null && !validatorContext.queryContextMap().containsKey(CTX_ROWS_PER_SEGMENT)) { + if (tableMetadata != null && !plannerContext.queryContextMap().containsKey(CTX_ROWS_PER_SEGMENT)) { final Integer targetSegmentRows = tableMetadata.targetSegmentRows(); if (targetSegmentRows != null) { - validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows); + plannerContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows); } } } @@ -256,7 +253,7 @@ private DatasourceTable validateInsertTarget( String tableName = destId.names.get(n - 1); // If this is a 2-part name, the first part must be the datasource schema. - if (n == 2 && !validatorContext.druidSchemaName().equals(destId.names.get(0))) { + if (n == 2 && !plannerContext.getPlannerToolbox().druidSchemaName().equals(destId.names.get(0))) { throw InvalidSqlInput.exception( "Table [%s] does not support operation [%s] because it is not a Druid datasource", destId, @@ -287,7 +284,7 @@ private DatasourceTable validateInsertTarget( // The catalog implementation may be "strict": and require that the target // table already exists, rather than the default "lenient" mode that can // create a new table. - if (validatorContext.catalog().ingestRequiresExistingTable()) { + if (plannerContext.getPlannerToolbox().catalogResolver().ingestRequiresExistingTable()) { throw InvalidSqlInput.exception("Cannot %s into [%s] because it does not exist", operationName, destId); } // New table. Validate the shape of the name. @@ -354,9 +351,9 @@ private void validateSegmentGranularity( // during conversion, however, we've just worked out the granularity, so we // do it here instead. try { - validatorContext.queryContextMap().put( + plannerContext.queryContextMap().put( DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, - validatorContext.jsonMapper().writeValueAsString(finalGranularity) + plannerContext.getPlannerToolbox().jsonMapper().writeValueAsString(finalGranularity) ); } catch (JsonProcessingException e) { From b475ecc827a2aa6111cc0db20b30f1792c944522 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Fri, 16 Feb 2024 13:56:56 -0500 Subject: [PATCH 06/12] * resolve getEffectiveGranularity comment * allow granularity mismatch * remove duplicate validation around unnamed columns --- .../calcite/planner/DruidSqlValidator.java | 118 +++++++----------- .../sql/calcite/planner/IngestHandler.java | 25 ---- 2 files changed, 48 insertions(+), 95 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index d4a7eddd7986..fac8e7889bff 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -51,9 +51,7 @@ import org.apache.druid.common.utils.IdUtils; import org.apache.druid.error.InvalidSqlInput; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; -import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.query.QueryContexts; import org.apache.druid.sql.calcite.parser.DruidSqlIngest; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; @@ -185,7 +183,20 @@ public void validateInsert(final SqlInsert insert) // Validate segment granularity, which depends on nothing else. if (!(ingestNode.getTargetTable() instanceof ExternalDestinationSqlIdentifier)) { - validateSegmentGranularity(operationName, ingestNode, tableMetadata); + Granularity effectiveGranularity = getEffectiveGranularity(operationName, ingestNode, tableMetadata); + // Note: though this is the validator, we cheat a bit and write the target + // granularity into the query context. Perhaps this step should be done + // during conversion, however, we've just worked out the granularity, so we + // do it here instead. + try { + plannerContext.queryContextMap().put( + DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, + plannerContext.getPlannerToolbox().jsonMapper().writeValueAsString(effectiveGranularity) + ); + } + catch (JsonProcessingException e) { + throw InvalidSqlInput.exception(e, "Invalid partition granularity [%s]", effectiveGranularity); + } } // The source must be a SELECT @@ -295,87 +306,54 @@ private DatasourceTable validateInsertTarget( } } - private void validateSegmentGranularity( + /** + * Gets the effective PARTITIONED BY granularity. Resolves the granularity from the granularity specified on the + * ingest node, and on the table metadata as stored in catalog, if any. Mismatches between the 2 granularities are + * allowed if both are specified. The granularity specified on the ingest node is taken to be the effective + * granulartiy if specified. If no granulartiy is specified on either the ingestNode or in the table catalog entry + * for the table, an error is thrown. + * + * @param operationName The operation name + * @param ingestNode The ingest node. + * @param tableMetadata The table metadata as stored in the catalog, if any. + * + * @return The effective granularity + * @throws org.apache.druid.error.DruidException indicating invalud Sql if both the ingest node and table metadata + * for the respective target table have no PARTITIONED BY granularity defined. + */ + private Granularity getEffectiveGranularity( final String operationName, final DruidSqlIngest ingestNode, - final DatasourceFacade tableMetadata + @Nullable final DatasourceFacade tableMetadata ) { - final Granularity definedGranularity = tableMetadata == null ? null : tableMetadata.segmentGranularity(); - if (definedGranularity != null) { - // Should already have been checked when creating the catalog entry - DruidSqlParserUtils.validateSupportedGranularityForPartitionedBy(null, definedGranularity); - } - final Granularity ingestionGranularity = ingestNode.getPartitionedBy().getGranularity(); + Granularity effectiveGranularity = null; + final Granularity ingestionGranularity = ingestNode.getPartitionedBy() != null + ? ingestNode.getPartitionedBy().getGranularity() + : null; if (ingestionGranularity != null) { DruidSqlParserUtils.validateSupportedGranularityForPartitionedBy(ingestNode, ingestionGranularity); - } - final Granularity finalGranularity; - if (definedGranularity == null) { - // The catalog has no granularity: apply the query value - if (ingestionGranularity == null) { - // Neither have a value: error - throw InvalidSqlInput.exception( - "Operation [%s] requires a PARTITIONED BY to be explicitly defined, but none was found.", - operationName - ); - } else { - finalGranularity = ingestionGranularity; - } + effectiveGranularity = ingestionGranularity; } else { - // The catalog has a granularity - if (ingestionGranularity == null) { - // The query has no granularity: just apply the catalog granularity. - finalGranularity = definedGranularity; - } else if (definedGranularity.equals(ingestionGranularity)) { - // Both have a setting and they are the same. We assume this would - // likely occur only when moving to the catalog, and old queries still - // contain the PARTITION BY clause. - finalGranularity = definedGranularity; - } else { - // Both have a setting but they are different. Since the user declared - // the grain, using a different one is an error. If the user wants to - // vary the grain across different (re)ingestions, then, at present, don't - // declare the grain in the catalog. - // TODO: allow mismatch - throw InvalidSqlInput.exception( - "PARTITIONED BY mismatch. Catalog: [%s], query: [%s]", - granularityToSqlString(definedGranularity), - granularityToSqlString(ingestionGranularity) - ); + final Granularity definedGranularity = tableMetadata == null + ? null + : tableMetadata.segmentGranularity(); + if (definedGranularity != null) { + // Should already have been checked when creating the catalog entry + DruidSqlParserUtils.validateSupportedGranularityForPartitionedBy(null, definedGranularity); + effectiveGranularity = definedGranularity; } } - // Note: though this is the validator, we cheat a bit and write the target - // granularity into the query context. Perhaps this step should be done - // during conversion, however, we've just worked out the granularity, so we - // do it here instead. - try { - plannerContext.queryContextMap().put( - DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, - plannerContext.getPlannerToolbox().jsonMapper().writeValueAsString(finalGranularity) - ); - } - catch (JsonProcessingException e) { - throw InvalidSqlInput.exception(e, "Invalid partition granularity [%s]", finalGranularity); + if (effectiveGranularity == null) { + throw InvalidSqlInput.exception( + "Operation [%s] requires a PARTITIONED BY to be explicitly defined, but none was found.", + operationName); } - } - private String granularityToSqlString(final Granularity gran) - { - if (gran == null) { - return "NULL"; - } - // The validation path will only ever see the ALL granularity or - // a period granularity. Neither the parser nor catalog can - // create a Duration granularity. - if (Granularities.ALL == gran) { - return "ALL TIME"; - } - return ((PeriodGranularity) gran).getPeriod().toString(); + return effectiveGranularity; } - /** * Compute and validate the target type. In normal SQL, the engine would insert * a project operator after the SELECT before the write to cast columns from the diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java index 3bd4908a3955..13c1f3d77e99 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java @@ -35,7 +35,6 @@ import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.tools.ValidationException; -import org.apache.calcite.util.Pair; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidSqlInput; @@ -57,17 +56,13 @@ import org.apache.druid.storage.ExportStorageProvider; import java.util.List; -import java.util.regex.Pattern; public abstract class IngestHandler extends QueryHandler { - private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE); - protected Granularity ingestionGranularity; protected IngestDestination targetDatasource; private SqlNode validatedQueryNode; - private RelDataType targetType; IngestHandler( HandlerContext handlerContext, @@ -75,8 +70,6 @@ public abstract class IngestHandler extends QueryHandler ) { super(handlerContext, explain); - //ingestionGranularity = ingestNode.getPartitionedBy() != null ? ingestNode.getPartitionedBy().getGranularity() : null; - //handlerContext.hook().captureInsert(ingestNode); } protected String operationName() @@ -160,9 +153,6 @@ public void validate() DruidSqlIngest ingestNode = ingestNode(); DruidSqlIngest validatedNode = (DruidSqlIngest) validate(ingestNode); validatedQueryNode = validatedNode.getSource(); - CalcitePlanner planner = handlerContext.planner(); - final SqlValidator validator = planner.getValidator(); - targetType = validator.getValidatedNodeType(validatedNode); ingestionGranularity = ingestNode().getPartitionedBy() != null ? ingestNode().getPartitionedBy().getGranularity() : null; @@ -256,7 +246,6 @@ protected PlannerResult planForDruid() throws ValidationException @Override protected QueryMaker buildQueryMaker(final RelRoot rootQueryRel) throws ValidationException { - validateColumnsForIngestion(rootQueryRel); return handlerContext.engine().buildQueryMakerForInsert( targetDatasource, rootQueryRel, @@ -264,20 +253,6 @@ protected QueryMaker buildQueryMaker(final RelRoot rootQueryRel) throws Validati ); } - private void validateColumnsForIngestion(RelRoot rootQueryRel) - { - // Check that there are no unnamed columns in the insert. - for (Pair field : rootQueryRel.fields) { - if (UNNAMED_COLUMN_PATTERN.matcher(field.right).matches()) { - throw InvalidSqlInput.exception( - "Insertion requires columns to be named, but at least one of the columns was unnamed. This is usually " - + "the result of applying a function without having an AS clause, please ensure that all function calls" - + "are named with an AS clause as in \"func(X) as myColumn\"." - ); - } - } - } - /** * Handler for the INSERT statement. */ From cad940d07796def6ef3c1a3bc33e5cfc19dd39f7 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Fri, 16 Feb 2024 14:35:41 -0500 Subject: [PATCH 07/12] * remove duplicate validation being done in DruidSqlValidator and IngestHandlers --- .../calcite/planner/DruidSqlValidator.java | 2 +- .../sql/calcite/planner/IngestHandler.java | 54 +++++-------------- 2 files changed, 15 insertions(+), 41 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index fac8e7889bff..b04aa179c74f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -253,7 +253,7 @@ private DatasourceTable validateInsertTarget( final SqlIdentifier destId = insertNs.getId(); if (destId.names.isEmpty()) { // I don't think this can happen, but include a branch for it just in case. - throw InvalidSqlInput.exception("%s requires a target table.", operationName); + throw InvalidSqlInput.exception("Operation [%s] requires a target table", operationName); } // Druid does not support 3+ part names. diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java index 13c1f3d77e99..1ffce9e2dc8e 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java @@ -33,7 +33,6 @@ import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlOrderBy; import org.apache.calcite.sql.SqlSelect; -import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.tools.ValidationException; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.error.DruidException; @@ -114,13 +113,6 @@ public void validate() if (ingestNode().getTargetTable() instanceof ExternalDestinationSqlIdentifier) { validateExport(); } else { - if (ingestNode().getPartitionedBy() == null) { - throw InvalidSqlInput.exception( - "Operation [%s] requires a PARTITIONED BY to be explicitly defined, but none was found.", - operationName() - ); - } - if (ingestNode().getExportFileFormat() != null) { throw InvalidSqlInput.exception( "The AS clause should only be specified while exporting rows into an EXTERN destination.", @@ -129,18 +121,6 @@ public void validate() } } - try { - PlannerContext plannerContext = handlerContext.plannerContext(); - if (ingestionGranularity != null) { - plannerContext.queryContextMap().put( - DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, - plannerContext.getJsonMapper().writeValueAsString(ingestionGranularity) - ); - } - } - catch (JsonProcessingException e) { - throw InvalidSqlInput.exception(e, "Invalid partition granularity [%s]", ingestionGranularity); - } // Check if CTX_SQL_OUTER_LIMIT is specified and fail the query if it is. CTX_SQL_OUTER_LIMIT being provided causes // the number of rows inserted to be limited which is likely to be confusing and unintended. if (handlerContext.queryContextMap().get(PlannerContext.CTX_SQL_OUTER_LIMIT) != null) { @@ -153,9 +133,19 @@ public void validate() DruidSqlIngest ingestNode = ingestNode(); DruidSqlIngest validatedNode = (DruidSqlIngest) validate(ingestNode); validatedQueryNode = validatedNode.getSource(); - ingestionGranularity = ingestNode().getPartitionedBy() != null - ? ingestNode().getPartitionedBy().getGranularity() - : null; + // This context key is set during validation in + // org.apache.druid.sql.calcite.planner.DruidSqlValidator.validateInsert. + String effectiveGranularity = (String) handlerContext.queryContextMap() + .get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY); + try { + ingestionGranularity = effectiveGranularity != null + ? handlerContext.jsonMapper().readValue(effectiveGranularity, Granularity.class) + : null; + } + catch (JsonProcessingException e) { + // this should never happen, since the granularity value is validated before being written to contextMap. + throw InvalidSqlInput.exception(e, "Invalid partition granularity [%s]", effectiveGranularity); + } targetDatasource = validateAndGetDataSourceForIngest(); } @@ -184,27 +174,11 @@ protected RelDataType returnedRowType() private IngestDestination validateAndGetDataSourceForIngest() { final SqlInsert insert = ingestNode(); - if (insert.isUpsert()) { - throw InvalidSqlInput.exception("UPSERT is not supported."); - } - - if (insert.getTargetColumnList() != null) { - throw InvalidSqlInput.exception( - "Operation [%s] cannot be run with a target column list, given [%s (%s)]", - operationName(), - insert.getTargetTable(), insert.getTargetColumnList() - ); - } final SqlIdentifier tableIdentifier = (SqlIdentifier) insert.getTargetTable(); final IngestDestination dataSource; - if (tableIdentifier.names.isEmpty()) { - // I don't think this can happen, but include a branch for it just in case. - throw DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.DEFENSIVE) - .build("Operation [%s] requires a target table", operationName()); - } else if (tableIdentifier instanceof ExternalDestinationSqlIdentifier) { + if (tableIdentifier instanceof ExternalDestinationSqlIdentifier) { ExternalDestinationSqlIdentifier externalDestination = ((ExternalDestinationSqlIdentifier) tableIdentifier); ExportStorageProvider storageProvider = externalDestination.toExportStorageProvider(handlerContext.jsonMapper()); dataSource = new ExportDestination(storageProvider); From 50061d6433427380b7cfe098a9ba46862f2c6de4 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Fri, 16 Feb 2024 14:42:44 -0500 Subject: [PATCH 08/12] * simplify SqlInsert source query conversion --- .../sql/calcite/planner/IngestHandler.java | 68 ++++++++----------- 1 file changed, 28 insertions(+), 40 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java index 1ffce9e2dc8e..a91df7991b06 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java @@ -32,7 +32,6 @@ import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlOrderBy; -import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.tools.ValidationException; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.error.DruidException; @@ -71,6 +70,32 @@ public abstract class IngestHandler extends QueryHandler super(handlerContext, explain); } + protected static SqlNode convertSourceQuery(DruidSqlIngest sqlNode) + { + SqlNode query = sqlNode.getSource(); + + // Check if ORDER BY clause is not provided to the underlying query + if (query instanceof SqlOrderBy) { + SqlOrderBy sqlOrderBy = (SqlOrderBy) query; + SqlNodeList orderByList = sqlOrderBy.orderList; + if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) { + throw InvalidSqlInput.exception( + "Cannot use an ORDER BY clause on a Query of type [%s], use CLUSTERED BY instead", + sqlNode.getOperator().getName() + ); + } + } + if (sqlNode.getClusteredBy() != null) { + query = DruidSqlParserUtils.convertClusterByToOrderBy(query, sqlNode.getClusteredBy()); + } + + if (!query.isA(SqlKind.QUERY)) { + throw InvalidSqlInput.exception("Unexpected SQL statement type [%s], expected it to be a QUERY", query.getKind()); + } + + return query; + } + protected String operationName() { return ingestNode().getOperator().getName(); @@ -247,26 +272,8 @@ public InsertHandler( protected static DruidSqlInsert convertQuery(DruidSqlIngest sqlNode) { - SqlNode query = sqlNode.getSource(); - - // Check if ORDER BY clause is not provided to the underlying query - if (query instanceof SqlOrderBy) { - SqlOrderBy sqlOrderBy = (SqlOrderBy) query; - SqlNodeList orderByList = sqlOrderBy.orderList; - if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) { - throw InvalidSqlInput.exception( - "Cannot use an ORDER BY clause on a Query of type [%s], use CLUSTERED BY instead", - sqlNode.getOperator().getName() - ); - } - } - if (sqlNode.getClusteredBy() != null) { - query = DruidSqlParserUtils.convertClusterByToOrderBy(query, sqlNode.getClusteredBy()); - } + SqlNode query = convertSourceQuery(sqlNode); - if (!query.isA(SqlKind.QUERY)) { - throw InvalidSqlInput.exception("Unexpected SQL statement type [%s], expected it to be a QUERY", query.getKind()); - } return DruidSqlInsert.create(new SqlInsert( sqlNode.getParserPosition(), (SqlNodeList) sqlNode.getOperandList().get(0), @@ -333,27 +340,8 @@ public ReplaceHandler( protected static DruidSqlReplace convertQuery(DruidSqlReplace sqlNode) { - SqlNode query = sqlNode.getSource(); - - // Check if ORDER BY clause is not provided to the underlying query - if (query instanceof SqlOrderBy) { - SqlOrderBy sqlOrderBy = (SqlOrderBy) query; - SqlNodeList orderByList = sqlOrderBy.orderList; - if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) { - throw InvalidSqlInput.exception( - "Cannot use an ORDER BY clause on a Query of type [%s], use CLUSTERED BY instead", - sqlNode.getOperator().getName() - ); - } - } - if (sqlNode.getClusteredBy() != null) { - query = DruidSqlParserUtils.convertClusterByToOrderBy(query, sqlNode.getClusteredBy()); - sqlNode.setSource((SqlSelect) (((SqlOrderBy) query).query)); - } + SqlNode query = convertSourceQuery(sqlNode); - if (!query.isA(SqlKind.QUERY)) { - throw InvalidSqlInput.exception("Unexpected SQL statement type [%s], expected it to be a QUERY", query.getKind()); - } return DruidSqlReplace.create( new SqlInsert( sqlNode.getParserPosition(), From 12c3d60075a89b27450900571178ad0b10854943 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Fri, 16 Feb 2024 15:28:11 -0500 Subject: [PATCH 09/12] * remove redundant comments --- .../org/apache/druid/sql/calcite/planner/DruidSqlValidator.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index b04aa179c74f..90aa21907ac8 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -395,9 +395,7 @@ private RelDataType validateTargetType( final String colName = sourceField.getName(); final DatasourceFacade.ColumnFacade definedCol = tableMetadata.column(colName); if (definedCol == null) { - // No catalog definition for this column. if (isStrict) { - // Table is strict: cannot add new columns at ingest time. throw InvalidSqlInput.exception( "Column [%s] is not defined in the target table [%s] strict schema", colName, From 38363302a7840829c09e3f9a2eb907dd12b05b04 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 21 Feb 2024 03:12:43 -0500 Subject: [PATCH 10/12] * add tests for catalog provided segment granularity --- .../catalog/sql/CatalogIngestionTest.java | 174 ++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogIngestionTest.java diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogIngestionTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogIngestionTest.java new file mode 100644 index 000000000000..e4020cc86d0c --- /dev/null +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogIngestionTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.sql; + +import org.apache.druid.catalog.CatalogException; +import org.apache.druid.catalog.model.Columns; +import org.apache.druid.catalog.model.TableMetadata; +import org.apache.druid.catalog.model.table.TableBuilder; +import org.apache.druid.catalog.storage.CatalogStorage; +import org.apache.druid.catalog.storage.CatalogTests; +import org.apache.druid.catalog.sync.CachedMetadataCatalog; +import org.apache.druid.catalog.sync.MetadataCatalog; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.CalciteIngestionDmlTest; +import org.apache.druid.sql.calcite.filtration.Filtration; +import org.apache.druid.sql.calcite.planner.CatalogResolver; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.apache.druid.sql.calcite.util.SqlTestFramework; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.Arrays; + +import static org.junit.Assert.fail; + +/** + * Test the use of catalog specs to drive MSQ ingestion. + */ +public class CatalogIngestionTest extends CalciteIngestionDmlTest +{ + @ClassRule + public static final TestDerbyConnector.DerbyConnectorRule DERBY_CONNECTION_RULE = + new TestDerbyConnector.DerbyConnectorRule(); + + /** + * Signature for the foo datasource after applying catalog metadata. + */ + private static final RowSignature FOO_SIGNATURE = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("extra1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m1", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .add("m2", ColumnType.DOUBLE) + .build(); + + private static CatalogStorage storage; + + @Override + public CatalogResolver createCatalogResolver() + { + CatalogTests.DbFixture dbFixture = new CatalogTests.DbFixture(DERBY_CONNECTION_RULE); + storage = dbFixture.storage; + MetadataCatalog catalog = new CachedMetadataCatalog( + storage, + storage.schemaRegistry(), + storage.jsonMapper() + ); + return new LiveCatalogResolver(catalog); + } + + @Override + public void finalizeTestFramework(SqlTestFramework sqlTestFramework) + { + super.finalizeTestFramework(sqlTestFramework); + buildTargetDatasources(); + buildFooDatasource(); + } + + private void buildTargetDatasources() + { + TableMetadata spec = TableBuilder.datasource("hourDs", "PT1H") + .build(); + createTableMetadata(spec); + } + + public void buildFooDatasource() + { + TableMetadata spec = TableBuilder.datasource("foo", "ALL") + .timeColumn() + .column("extra1", null) + .column("dim2", null) + .column("dim1", null) + .column("cnt", null) + .column("m1", Columns.DOUBLE) + .column("extra2", Columns.LONG) + .column("extra3", Columns.STRING) + .hiddenColumns(Arrays.asList("dim3", "unique_dim1")) + .sealed(true) + .build(); + createTableMetadata(spec); + } + + private void createTableMetadata(TableMetadata table) + { + try { + storage.tables().create(table); + } + catch (CatalogException e) { + fail(e.getMessage()); + } + } + + /** + * If the segment grain is given in the catalog then use this value is used. + */ + @Test + public void testInsertHourGrain() + { + testIngestionQuery() + .sql("INSERT INTO hourDs\n" + + "SELECT * FROM foo") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("hourDs", FOO_SIGNATURE) + .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__time", "cnt", "dim1", "dim2", "extra1", "extra2", "extra3", "m1", "m2") + .context(queryContextWithGranularity(Granularities.HOUR)) + .build() + ) + .verify(); + } + + /** + * If the segment grain is given in the catalog, and also by PARTITIONED BY, then + * the query value is used. + */ + @Test + public void testInsertHourGrainWithDay() + { + testIngestionQuery() + .sql("INSERT INTO hourDs\n" + + "SELECT * FROM foo\n" + + "PARTITIONED BY day") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("hourDs", FOO_SIGNATURE) + .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__time", "cnt", "dim1", "dim2", "extra1", "extra2", "extra3", "m1", "m2") + .context(queryContextWithGranularity(Granularities.DAY)) + .build() + ) + .verify(); + } +} From 914553fd35aa18166e0716846a92ac17c6ca0093 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 21 Feb 2024 03:38:40 -0500 Subject: [PATCH 11/12] * remove uneeded comments --- .../druid/sql/calcite/planner/DruidSqlIngestOperator.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java index 90be66d84b70..4ddc2f25af21 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java @@ -58,12 +58,10 @@ public SqlCall createCall( { return new DruidSqlInsert( pos, - // Must match SqlInsert.getOperandList() (SqlNodeList) operands[0], operands[1], operands[2], (SqlNodeList) operands[3], - // Must match DruidSqlIngest.getOperandList() (SqlGranularityLiteral) operands[4], (SqlNodeList) operands[5], null // fix this @@ -87,15 +85,12 @@ public SqlCall createCall( { return new DruidSqlReplace( pos, - // Must match SqlInsert.getOperandList() (SqlNodeList) operands[0], operands[1], operands[2], (SqlNodeList) operands[3], - // Must match DruidSqlIngest.getOperandList() (SqlGranularityLiteral) operands[4], (SqlNodeList) operands[5], - // Must match DruidSqlReplace.getOperandList() operands[6], null // fix this ); From 0608676d591aa14efcffb90f1670e8d0347ac968 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 21 Feb 2024 12:57:14 -0500 Subject: [PATCH 12/12] * fix exportFileFormat issue --- sql/src/main/codegen/includes/common.ftl | 4 +- sql/src/main/codegen/includes/insert.ftl | 2 +- sql/src/main/codegen/includes/replace.ftl | 4 +- .../sql/calcite/parser/DruidSqlIngest.java | 8 ++- .../sql/calcite/parser/DruidSqlInsert.java | 7 ++- .../sql/calcite/parser/DruidSqlReplace.java | 15 ++--- .../planner/DruidSqlIngestOperator.java | 7 ++- .../sql/calcite/planner/IngestHandler.java | 8 +-- .../druid/sql/calcite/CalciteExportTest.java | 56 +++++++++++++++++++ 9 files changed, 86 insertions(+), 25 deletions(-) diff --git a/sql/src/main/codegen/includes/common.ftl b/sql/src/main/codegen/includes/common.ftl index 95138a7dbbf1..1edc542dc990 100644 --- a/sql/src/main/codegen/includes/common.ftl +++ b/sql/src/main/codegen/includes/common.ftl @@ -107,14 +107,14 @@ SqlTypeNameSpec DruidType() : } // Parses the supported file formats for export. -String FileFormat() : +SqlIdentifier FileFormat() : { SqlNode format; } { format = SimpleIdentifier() { - return format.toString(); + return (SqlIdentifier) format; } } diff --git a/sql/src/main/codegen/includes/insert.ftl b/sql/src/main/codegen/includes/insert.ftl index 81f5ed1253e3..0a949aec4334 100644 --- a/sql/src/main/codegen/includes/insert.ftl +++ b/sql/src/main/codegen/includes/insert.ftl @@ -35,7 +35,7 @@ SqlNode DruidSqlInsertEof() : final Pair p; SqlGranularityLiteral partitionedBy = null; SqlNodeList clusteredBy = null; - String exportFileFormat = null; + SqlIdentifier exportFileFormat = null; } { ( diff --git a/sql/src/main/codegen/includes/replace.ftl b/sql/src/main/codegen/includes/replace.ftl index 15edeaac12e3..d067bc450bb8 100644 --- a/sql/src/main/codegen/includes/replace.ftl +++ b/sql/src/main/codegen/includes/replace.ftl @@ -30,7 +30,7 @@ SqlNode DruidSqlReplaceEof() : SqlNodeList clusteredBy = null; final Pair p; SqlNode replaceTimeQuery = null; - String exportFileFormat = null; + SqlIdentifier exportFileFormat = null; } { { s = span(); } @@ -90,7 +90,7 @@ SqlNode DruidSqlReplaceEof() : { sqlInsert = new SqlInsert(s.end(source), SqlNodeList.EMPTY, destination, source, columnList); - return DruidSqlReplace.create(sqlInsert, partitionedBy, clusteredBy, replaceTimeQuery, exportFileFormat); + return DruidSqlReplace.create(sqlInsert, partitionedBy, clusteredBy, exportFileFormat, replaceTimeQuery); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java index a36ef9b6b96e..cce253b4c1ef 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.parser; +import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; @@ -43,7 +44,7 @@ public abstract class DruidSqlIngest extends SqlInsert @Nullable protected final SqlNodeList clusteredBy; @Nullable - private final String exportFileFormat; + private final SqlIdentifier exportFileFormat; public DruidSqlIngest( SqlParserPos pos, @@ -53,7 +54,7 @@ public DruidSqlIngest( SqlNodeList columnList, @Nullable SqlGranularityLiteral partitionedBy, @Nullable SqlNodeList clusteredBy, - @Nullable String exportFileFormat + @Nullable SqlIdentifier exportFileFormat ) { super(pos, keywords, targetTable, source, columnList); @@ -76,7 +77,7 @@ public SqlNodeList getClusteredBy() } @Nullable - public String getExportFileFormat() + public SqlIdentifier getExportFileFormat() { return exportFileFormat; } @@ -88,6 +89,7 @@ public List getOperandList() .addAll(super.getOperandList()) .add(partitionedBy) .add(clusteredBy) + .add(exportFileFormat) .build(); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java index 75bcfb86d52d..e283c9df9586 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.parser; +import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; @@ -46,7 +47,7 @@ public static DruidSqlInsert create( @Nonnull SqlInsert insertNode, @Nullable SqlGranularityLiteral partitionedBy, @Nullable SqlNodeList clusteredBy, - @Nullable String exportFileFormat + @Nullable SqlIdentifier exportFileFormat ) { return new DruidSqlInsert( @@ -75,7 +76,7 @@ public DruidSqlInsert( SqlNodeList columnList, @Nullable SqlGranularityLiteral partitionedBy, @Nullable SqlNodeList clusteredBy, - @Nullable String exportFileFormat + @Nullable SqlIdentifier exportFileFormat ) { super( @@ -111,7 +112,7 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) writer.newlineAndIndent(); if (getExportFileFormat() != null) { writer.keyword("AS"); - writer.print(getExportFileFormat()); + writer.print(getExportFileFormat().toString()); writer.newlineAndIndent(); } getSource().unparse(writer, 0, 0); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java index d4d7bcf6fd03..45b677631d2a 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.parser; +import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; @@ -50,8 +51,8 @@ public static DruidSqlReplace create( @Nonnull SqlInsert insertNode, @Nullable SqlGranularityLiteral partitionedBy, @Nullable SqlNodeList clusteredBy, - @Nullable SqlNode replaceTimeQuery, - @Nullable String exportFileFormat + @Nullable SqlIdentifier exportFileFormat, + @Nullable SqlNode replaceTimeQuery ) { return new DruidSqlReplace( @@ -62,8 +63,8 @@ public static DruidSqlReplace create( insertNode.getTargetColumnList(), partitionedBy, clusteredBy, - replaceTimeQuery, - exportFileFormat + exportFileFormat, + replaceTimeQuery ); } @@ -81,8 +82,8 @@ public DruidSqlReplace( SqlNodeList columnList, @Nullable SqlGranularityLiteral partitionedBy, @Nullable SqlNodeList clusteredBy, - @Nullable SqlNode replaceTimeQuery, - @Nullable String exportFileFormat + @Nullable SqlIdentifier exportFileFormat, + @Nullable SqlNode replaceTimeQuery ) { super( @@ -136,7 +137,7 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) if (getExportFileFormat() != null) { writer.keyword("AS"); - writer.print(getExportFileFormat()); + writer.print(getExportFileFormat().toString()); writer.newlineAndIndent(); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java index 4ddc2f25af21..628e47686631 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlIngestOperator.java @@ -20,6 +20,7 @@ package org.apache.druid.sql.calcite.planner; import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; @@ -64,7 +65,7 @@ public SqlCall createCall( (SqlNodeList) operands[3], (SqlGranularityLiteral) operands[4], (SqlNodeList) operands[5], - null // fix this + (SqlIdentifier) operands[6] ); } } @@ -91,8 +92,8 @@ public SqlCall createCall( (SqlNodeList) operands[3], (SqlGranularityLiteral) operands[4], (SqlNodeList) operands[5], - operands[6], - null // fix this + (SqlIdentifier) operands[6], + operands[7] ); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java index a91df7991b06..4862f2821827 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java @@ -118,7 +118,7 @@ private void validateExport() .build("Export statements do not support a PARTITIONED BY or CLUSTERED BY clause."); } - final String exportFileFormat = ingestNode().getExportFileFormat(); + final SqlIdentifier exportFileFormat = ingestNode().getExportFileFormat(); if (exportFileFormat == null) { throw InvalidSqlInput.exception( "Exporting rows into an EXTERN destination requires an AS clause to specify the format, but none was found.", @@ -127,7 +127,7 @@ private void validateExport() } else { handlerContext.plannerContext().queryContextMap().put( DruidSqlIngest.SQL_EXPORT_FILE_FORMAT, - exportFileFormat + exportFileFormat.toString() ); } } @@ -352,8 +352,8 @@ protected static DruidSqlReplace convertQuery(DruidSqlReplace sqlNode) ), sqlNode.getPartitionedBy(), sqlNode.getClusteredBy(), - sqlNode.getReplaceTimeQuery(), - sqlNode.getExportFileFormat() + sqlNode.getExportFileFormat(), + sqlNode.getReplaceTimeQuery() ); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java index cc4b2a0fec49..4a97367fcd19 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import org.apache.calcite.avatica.SqlType; import org.apache.druid.error.DruidException; import org.apache.druid.guice.DruidInjectorBuilder; import org.apache.druid.initialization.DruidModule; @@ -37,6 +38,7 @@ import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.destination.ExportDestination; +import org.apache.druid.sql.http.SqlParameter; import org.apache.druid.storage.StorageConfig; import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.local.LocalFileExportStorageProvider; @@ -46,6 +48,7 @@ import org.junit.Test; import org.junit.internal.matchers.ThrowableMessageMatcher; +import java.util.Collections; import java.util.List; public class CalciteExportTest extends CalciteIngestionDmlTest @@ -176,6 +179,59 @@ public void testInsertIntoExtern() .verify(); } + + @Test + public void testInsertIntoExternParameterized() + { + testIngestionQuery() + .sql(StringUtils.format("INSERT INTO EXTERN(%s()) " + + "AS CSV " + + "SELECT dim2 FROM foo WHERE dim2=?", TestExportStorageConnector.TYPE_NAME)) + .parameters(Collections.singletonList(new SqlParameter(SqlType.VARCHAR, "val"))) + .expectQuery( + Druids.newScanQueryBuilder() + .dataSource( + "foo" + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .filters(equality("dim2", "val", ColumnType.STRING)) + .columns("dim2") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .build() + ) + .expectResources(dataSourceRead("foo"), externalWrite(TestExportStorageConnector.TYPE_NAME)) + .expectTarget(ExportDestination.TYPE_KEY, RowSignature.builder().add("dim2", ColumnType.STRING).build()) + .verify(); + } + + // Disabled until replace supports external destinations. To be enabled after that point. + @Test + @Ignore + public void testReplaceIntoExternParameterized() + { + testIngestionQuery() + .sql(StringUtils.format("REPLACE INTO EXTERN(%s()) " + + "AS CSV " + + "SELECT dim2 FROM foo WHERE dim2=?", TestExportStorageConnector.TYPE_NAME)) + .parameters(Collections.singletonList(new SqlParameter(SqlType.VARCHAR, "val"))) + .expectQuery( + Druids.newScanQueryBuilder() + .dataSource( + "foo" + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .filters(equality("dim2", "val", ColumnType.STRING)) + .columns("dim2") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .build() + ) + .expectResources(dataSourceRead("foo"), externalWrite(TestExportStorageConnector.TYPE_NAME)) + .expectTarget(ExportDestination.TYPE_KEY, RowSignature.builder().add("dim2", ColumnType.STRING).build()) + .verify(); + } + @Test public void testExportWithoutFormat() {