Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate catalog schema validation into planner -WIP #15711

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ public OverlordClient overlordClient()
public QueryMaker buildQueryMakerForInsert(
final String targetDataSource,
final RelRoot relRoot,
final PlannerContext plannerContext
final PlannerContext plannerContext,
final RelDataType targetType
)
{
validateInsert(relRoot.rel, relRoot.fields, plannerContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.calcite.external.HttpOperatorConversion;
import org.apache.druid.sql.calcite.external.InlineOperatorConversion;
import org.apache.druid.sql.calcite.external.LocalOperatorConversion;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
Expand Down Expand Up @@ -347,6 +350,9 @@ public void configure(Binder binder)
binder.install(new NestedDataModule());
NestedDataModule.registerHandlersAndSerde();
SqlBindings.addOperatorConversion(binder, ExternalOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, HttpOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, InlineOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, LocalOperatorConversion.class);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void validate(Object value, ObjectMapper jsonMapper)
{
String gran = decode(value, jsonMapper);
if (Strings.isNullOrEmpty(gran)) {
// TODO: allow null granularity
throw new IAE("Segment granularity is required.");
}
CatalogUtils.validateGranularity(gran);
Expand Down
27 changes: 10 additions & 17 deletions sql/src/main/codegen/includes/common.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -18,58 +18,51 @@
*/

// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
org.apache.druid.java.util.common.Pair<Granularity, String> PartitionGranularity() :
SqlNode PartitionGranularity() :
{
SqlNode e;
Granularity granularity;
String unparseString;
SqlNode result;
}
{
(
<HOUR>
{
granularity = Granularities.HOUR;
unparseString = "HOUR";
result = SqlLiteral.createSymbol(DruidSqlParserUtils.GranularityGrain.HOUR_GRAIN, getPos());
}
|
<DAY>
{
granularity = Granularities.DAY;
unparseString = "DAY";
result = SqlLiteral.createSymbol(DruidSqlParserUtils.GranularityGrain.DAY_GRAIN, getPos());
}
|
<MONTH>
{
granularity = Granularities.MONTH;
unparseString = "MONTH";
result = SqlLiteral.createSymbol(DruidSqlParserUtils.GranularityGrain.MONTH_GRAIN, getPos());
}
|
<YEAR>
{
granularity = Granularities.YEAR;
unparseString = "YEAR";
result = SqlLiteral.createSymbol(DruidSqlParserUtils.GranularityGrain.YEAR_GRAIN, getPos());
}
|
<ALL>
{
granularity = Granularities.ALL;
unparseString = "ALL";
result = SqlLiteral.createSymbol(DruidSqlParserUtils.GranularityGrain.ALL_GRAIN, getPos());
}
[
<TIME>
{
unparseString += " TIME";
result = SqlLiteral.createSymbol(DruidSqlParserUtils.GranularityGrain.ALL_TIME_GRAIN, getPos());
}
]
|
e = Expression(ExprContext.ACCEPT_SUB_QUERY)
{
granularity = DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(e);
unparseString = e.toString();
result = e;
}
)
{
return new org.apache.druid.java.util.common.Pair(granularity, unparseString);
return result;
}
}

Expand Down
16 changes: 5 additions & 11 deletions sql/src/main/codegen/includes/insert.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,21 @@ SqlNode DruidSqlInsert() :
SqlNode DruidSqlInsertEof() :
{
SqlNode insertNode;
org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
SqlNode partitionedBy = null;
SqlNodeList clusteredBy = null;
}
{
insertNode = DruidSqlInsert()
// PARTITIONED BY is necessary, but is kept optional in the grammar. It is asserted that it is not missing in the
// DruidSqlInsert constructor so that we can return a custom error message.
// PARTITIONED BY is necessary. It can be provided either in this statement or in the catalog.
// As a result, it is optional in the grammar. It is asserted that it is not missing in the
// insert analysis step.
[
<PARTITIONED> <BY>
partitionedBy = PartitionGranularity()
]
[
clusteredBy = ClusteredBy()
]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
throw org.apache.druid.sql.calcite.parser.DruidSqlParserUtils.problemParsing(
"CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause"
);
}
}
// EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times.
// The reason for adding EOF here is to ensure that we create a DruidSqlInsert node after the syntax has been
// validated and throw SQL syntax errors before performing validations in the DruidSqlInsert which can overshadow the
Expand All @@ -111,6 +105,6 @@ SqlNode DruidSqlInsertEof() :
return insertNode;
}
SqlInsert sqlInsert = (SqlInsert) insertNode;
return new DruidSqlInsert(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, clusteredBy);
return new DruidSqlInsert(sqlInsert, partitionedBy, clusteredBy);
}
}
12 changes: 2 additions & 10 deletions sql/src/main/codegen/includes/replace.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ SqlNode DruidSqlReplaceEof() :
SqlNodeList columnList = null;
final Span s;
SqlInsert sqlInsert;
// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
SqlNode partitionedBy = null;
SqlNodeList clusteredBy = null;
final Pair<SqlNodeList, SqlNodeList> p;
SqlNode replaceTimeQuery = null;
Expand Down Expand Up @@ -58,21 +57,14 @@ SqlNode DruidSqlReplaceEof() :
[
clusteredBy = ClusteredBy()
]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
throw org.apache.druid.sql.calcite.parser.DruidSqlParserUtils.problemParsing(
"CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause"
);
}
}
// EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times.
// The reason for adding EOF here is to ensure that we create a DruidSqlReplace node after the syntax has been
// validated and throw SQL syntax errors before performing validations in the DruidSqlReplace which can overshadow the
// actual error message.
<EOF>
{
sqlInsert = new SqlInsert(s.end(source), SqlNodeList.EMPTY, table, source, columnList);
return new DruidSqlReplace(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, clusteredBy, replaceTimeQuery);
return new DruidSqlReplace(sqlInsert, partitionedBy, clusteredBy, replaceTimeQuery);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

//CHECKSTYLE.OFF: PackageName - Must be in Calcite

package org.apache.calcite.sql.validate;

import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlSelect;

public class ValidatorShim
{
/**
* Provide the ability to create an OrderBy scope to Druid-specific code.
* @param parent
* @param orderList
* @param select
* @return
*/
public static OrderByScope newOrderByScope(
SqlValidatorScope parent,
SqlNodeList orderList,
SqlSelect select)
{
return new OrderByScope(parent, orderList, select);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package org.apache.druid.sql.calcite.parser;

import java.util.List;

import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.calcite.util.ImmutableNullableList;

import javax.annotation.Nullable;

Expand All @@ -34,10 +36,8 @@
*/
public abstract class DruidSqlIngest extends SqlInsert
{
protected final Granularity partitionedBy;

// Used in the unparse function to generate the original query since we convert the string to an enum
protected final String partitionedByStringForUnparse;
@Nullable
protected final SqlNode partitionedBy;

@Nullable
protected final SqlNodeList clusteredBy;
Expand All @@ -48,19 +48,17 @@ public DruidSqlIngest(
SqlNode targetTable,
SqlNode source,
SqlNodeList columnList,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nullable SqlNode partitionedBy,
@Nullable SqlNodeList clusteredBy
)
{
super(pos, keywords, targetTable, source, columnList);

this.partitionedByStringForUnparse = partitionedByStringForUnparse;
this.partitionedBy = partitionedBy;
this.clusteredBy = clusteredBy;
}

public Granularity getPartitionedBy()
public SqlNode getPartitionedBy()
{
return partitionedBy;
}
Expand All @@ -70,4 +68,14 @@ public SqlNodeList getClusteredBy()
{
return clusteredBy;
}

@Override
public List<SqlNode> getOperandList()
{
return ImmutableNullableList.<SqlNode>builder()
.addAll(super.getOperandList())
.add(partitionedBy)
.add(clusteredBy)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.druid.sql.calcite.planner.DruidSqlIngestOperator;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -38,30 +39,40 @@ public class DruidSqlInsert extends DruidSqlIngest
{
public static final String SQL_INSERT_SEGMENT_GRANULARITY = "sqlInsertSegmentGranularity";

// This allows reusing super.unparse
public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;

/**
* While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is
* disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
* errors when the PARTITIONED BY custom clause is not present, and keeps its error separate from JavaCC/Calcite's
* custom errors which can be cryptic when someone accidentally forgets to explicitly specify the PARTITIONED BY clause
*/
public DruidSqlInsert(
@Nonnull SqlInsert insertNode,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nullable SqlNode partitionedBy,
@Nullable SqlNodeList clusteredBy
)
{
super(
this(
insertNode.getParserPosition(),
(SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
insertNode.getTargetTable(),
insertNode.getSource(),
insertNode.getTargetColumnList(),
partitionedBy,
partitionedByStringForUnparse,
clusteredBy
);
}

public DruidSqlInsert(
SqlParserPos pos,
SqlNodeList keywords,
SqlNode targetTable,
SqlNode source,
SqlNodeList columnList,
@Nullable SqlNode partitionedBy,
@Nullable SqlNodeList clusteredBy
)
{
super(
pos,
keywords,
targetTable,
source,
columnList,
partitionedBy,
clusteredBy
);
}
Expand All @@ -70,15 +81,15 @@ public DruidSqlInsert(
@Override
public SqlOperator getOperator()
{
return OPERATOR;
return DruidSqlIngestOperator.INSERT_OPERATOR;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
{
super.unparse(writer, leftPrec, rightPrec);
writer.keyword("PARTITIONED BY");
writer.keyword(partitionedByStringForUnparse);
writer.keyword(partitionedBy.toString());
if (getClusteredBy() != null) {
writer.keyword("CLUSTERED BY");
SqlWriter.Frame frame = writer.startList("", "");
Expand Down