Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move INSERT & REPLACE validation to the Calcite validator #15908

Merged
merged 14 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@
import org.apache.druid.sql.calcite.export.TestExportStorageConnectorProvider;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.calcite.external.HttpOperatorConversion;
import org.apache.druid.sql.calcite.external.InlineOperatorConversion;
import org.apache.druid.sql.calcite.external.LocalOperatorConversion;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
Expand Down Expand Up @@ -351,6 +354,9 @@ public void configure(Binder binder)
binder.install(new NestedDataModule());
NestedDataModule.registerHandlersAndSerde();
SqlBindings.addOperatorConversion(binder, ExternalOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, HttpOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, InlineOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, LocalOperatorConversion.class);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.druid.sql.calcite.planner.DruidSqlIngestOperator;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -39,7 +40,7 @@ public class DruidSqlInsert extends DruidSqlIngest
public static final String SQL_INSERT_SEGMENT_GRANULARITY = "sqlInsertSegmentGranularity";

// This allows reusing super.unparse
public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;
public static final SqlOperator OPERATOR = DruidSqlIngestOperator.INSERT_OPERATOR;

public static DruidSqlInsert create(
@Nonnull SqlInsert insertNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
package org.apache.druid.sql.calcite.parser;

import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;
import org.apache.druid.sql.calcite.planner.DruidSqlIngestOperator;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -43,7 +42,7 @@ public class DruidSqlReplace extends DruidSqlIngest
{
public static final String SQL_REPLACE_TIME_CHUNKS = "sqlReplaceTimeChunks";

public static final SqlOperator OPERATOR = new SqlSpecialOperator("REPLACE", SqlKind.OTHER);
public static final SqlOperator OPERATOR = DruidSqlIngestOperator.REPLACE_OPERATOR;

private final SqlNode replaceTimeQuery;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class CalcitePlanner implements Planner, ViewExpander
private final @Nullable RelOptCostFactory costFactory;
private final Context context;
private final CalciteConnectionConfig connectionConfig;
private final DruidSqlValidator.ValidatorContext validatorContext;
private final RelDataTypeSystem typeSystem;

/**
Expand Down Expand Up @@ -118,9 +119,10 @@ public class CalcitePlanner implements Planner, ViewExpander
* {@link org.apache.calcite.tools.Frameworks#getPlanner} instead.
*/
@SuppressWarnings("method.invocation.invalid")
public CalcitePlanner(FrameworkConfig config)
public CalcitePlanner(FrameworkConfig config, DruidSqlValidator.ValidatorContext validatorContext)
{
this.costFactory = config.getCostFactory();
this.validatorContext = validatorContext;
this.defaultSchema = config.getDefaultSchema();
this.operatorTable = config.getOperatorTable();
this.programs = config.getPrograms();
Expand Down Expand Up @@ -283,7 +285,7 @@ public final RelNode convert(SqlNode sql)
public RelRoot rel(SqlNode sql)
{
ensure(CalcitePlanner.State.STATE_4_VALIDATED);
SqlNode validatedSqlNode = Objects.requireNonNull(
Objects.requireNonNull(
this.validatedSqlNode,
"validatedSqlNode is null. Need to call #validate() first"
);
Expand All @@ -295,11 +297,11 @@ public RelRoot rel(SqlNode sql)
final SqlToRelConverter.Config config =
sqlToRelConverterConfig.withTrimUnusedFields(false);
final SqlToRelConverter sqlToRelConverter =
new SqlToRelConverter(this, validator,
new DruidSqlToRelConverter(this, validator,
createCatalogReader(), cluster, convertletTable, config
);
RelRoot root =
sqlToRelConverter.convertQuery(validatedSqlNode, false, true);
sqlToRelConverter.convertQuery(sql, false, true);
root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
final RelBuilder relBuilder =
config.getRelBuilderFactory().create(cluster, null);
Expand Down Expand Up @@ -408,7 +410,8 @@ private SqlValidator createSqlValidator(CalciteCatalogReader catalogReader)
catalogReader,
getTypeFactory(),
validatorConfig,
context.unwrapOrThrow(PlannerContext.class)
context.unwrapOrThrow(PlannerContext.class),
validatorContext
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public AuthResult(
)
{
this.frameworkConfig = frameworkConfig;
this.planner = new CalcitePlanner(frameworkConfig);
this.planner = new CalcitePlanner(frameworkConfig, new ValidatorContextImpl());
this.plannerContext = plannerContext;
this.engine = engine;
this.hook = hook == null ? NoOpPlannerHook.INSTANCE : hook;
Expand Down Expand Up @@ -318,6 +318,33 @@ public PlannerHook hook()
}
}

public class ValidatorContextImpl implements DruidSqlValidator.ValidatorContext
zachjsh marked this conversation as resolved.
Show resolved Hide resolved
{
@Override
public Map<String, Object> queryContextMap()
{
return plannerContext.queryContextMap();
}

@Override
public CatalogResolver catalog()
{
return plannerContext.getPlannerToolbox().catalogResolver();
}

@Override
public String druidSchemaName()
{
return plannerContext.getPlannerToolbox().druidSchemaName();
}

@Override
public ObjectMapper jsonMapper()
{
return plannerContext.getPlannerToolbox().jsonMapper();
}
}

public static DruidException translateException(Exception e)
{
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.sql.calcite.planner;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
import org.apache.druid.sql.calcite.parser.SqlGranularityLiteral;

import java.util.HashSet;
import java.util.Set;

public class DruidSqlIngestOperator extends SqlSpecialOperator implements AuthorizableOperator
{
public static final SqlSpecialOperator INSERT_OPERATOR =
new DruidSqlInsertOperator();
public static final SqlSpecialOperator REPLACE_OPERATOR =
new DruidSqlReplaceOperator();

public static class DruidSqlInsertOperator extends DruidSqlIngestOperator
{
public DruidSqlInsertOperator()
{
super("INSERT");
}

@Override
public SqlCall createCall(
SqlLiteral functionQualifier,
SqlParserPos pos,
SqlNode... operands
)
{
return new DruidSqlInsert(
pos,
// Must match SqlInsert.getOperandList()
(SqlNodeList) operands[0],
operands[1],
operands[2],
(SqlNodeList) operands[3],
// Must match DruidSqlIngest.getOperandList()
(SqlGranularityLiteral) operands[4],
(SqlNodeList) operands[5],
null // fix this
);
}
}

public static class DruidSqlReplaceOperator extends DruidSqlIngestOperator
{
public DruidSqlReplaceOperator()
{
super("REPLACE");
}

@Override
public SqlCall createCall(
SqlLiteral functionQualifier,
SqlParserPos pos,
SqlNode... operands
)
{
return new DruidSqlReplace(
pos,
// Must match SqlInsert.getOperandList()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand these comments - it doesn't help me understand...do we need them?
you could create a bunch of local variables with the casted types and name them accrodingly - that might help or even provide a place to add comments...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the comments, let me know if ok now.

(SqlNodeList) operands[0],
operands[1],
operands[2],
(SqlNodeList) operands[3],
// Must match DruidSqlIngest.getOperandList()
(SqlGranularityLiteral) operands[4],
(SqlNodeList) operands[5],
// Must match DruidSqlReplace.getOperandList()
operands[6],
null // fix this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what needs to be fixed? I would recommend to either use FIXME and address it before merging the patch; or remove this comment and make sure that bad doesn't happen by providing some reasonable exception

Copy link
Contributor Author

@zachjsh zachjsh Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that in order for this to work properly, the exportFileFormat needs to be changed into an SqlNode, and added as an operand. Without this, I dont think that parameterized queries using export capabilites will work. cc @adarshsanjeev

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

);
}
}

public DruidSqlIngestOperator(String name)
{
super(name, SqlKind.INSERT);
}

@Override
public Set<ResourceAction> computeResources(SqlCall call, boolean inputSourceTypeSecurityEnabled)
{
// resource actions are computed in the respective ingest handlers.
return new HashSet<>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.sql.calcite.planner;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable.ViewExpander;
import org.apache.calcite.prepare.Prepare.CatalogReader;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql2rel.SqlRexConvertletTable;
import org.apache.calcite.sql2rel.SqlToRelConverter;

public class DruidSqlToRelConverter extends SqlToRelConverter
{
public DruidSqlToRelConverter(
final ViewExpander viewExpander,
final SqlValidator validator,
final CatalogReader catalogReader,
RelOptCluster cluster,
final SqlRexConvertletTable convertletTable,
final Config config
)
{
super(viewExpander, validator, catalogReader, cluster, convertletTable, config);
}

/**
* Convert a Druid {@code INSERT} or {@code REPLACE} statement. The code is the same
* as the normal conversion, except we don't actually create the final modify node.
* Druid has its own special way to handle inserts. (This should probably change in
* some future, but doing so requires changes in the SQL engine and MSQ, which is a bit
* invasive.)
*/
@Override
protected RelNode convertInsert(SqlInsert call)
{
// Get the target type: the column types we want to write into the target datasource.
final RelDataType targetRowType = validator.getValidatedNodeType(call);
assert targetRowType != null;

// Convert the underlying SELECT. We pushed the CLUSTERED BY clause into the SELECT
zachjsh marked this conversation as resolved.
Show resolved Hide resolved
// as its ORDER BY. We claim this is the top query because MSQ doesn't actually
// use the Calcite insert node.
RelNode sourceRel = convertQueryRecursive(call.getSource(), true, targetRowType).project();

// We omit the column mapping and insert node that Calcite normally provides.
// Presumably MSQ does these its own way.
return sourceRel;
}
}
Loading
Loading