Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend the PARTITIONED BY clause to accept string literals for the time partitioning #15836

Merged
merged 12 commits into from
Feb 9, 2024
27 changes: 27 additions & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,33 @@ The following ISO 8601 periods are supported for `TIME_FLOOR` and the string con
- P3M
- P1Y

The string constant can also include any of the keywords mentioned above:

- `HOUR` - Same as `'PT1H'`
- `DAY` - Same as `'P1D'`
- `MONTH` - Same as `'P1M'`
- `YEAR` - Same as `'P1Y'`
- `ALL TIME`
- `ALL` - Alias for `ALL TIME`

The `WEEK` granularity is deprecated and not supported in MSQ.

Examples:

```SQL
-- Keyword
PARTITIONED BY HOUR

-- String constant
zachjsh marked this conversation as resolved.
Show resolved Hide resolved
PARTITIONED BY 'HOUR'

-- Or
zachjsh marked this conversation as resolved.
Show resolved Hide resolved
PARTITIOND BY 'PT1H'
zachjsh marked this conversation as resolved.
Show resolved Hide resolved

-- TIME_FLOOR function
PARTITIONED BY TIME_FLOOR(__time, 'PT1H')
```

For more information about partitioning, see [Partitioning](concepts.md#partitioning-by-time). <br /><br />
*Avoid partitioning by week, `P1W`, because weeks don't align neatly with months and years, making it difficult to partition by coarser granularities later.

Expand Down
1 change: 1 addition & 0 deletions sql/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ data: {
"org.apache.calcite.sql.SqlNode"
"org.apache.calcite.sql.SqlInsert"
"org.apache.druid.java.util.common.granularity.Granularity"
"org.apache.druid.java.util.common.granularity.GranularityType"
"org.apache.druid.java.util.common.granularity.Granularities"
"org.apache.druid.sql.calcite.parser.DruidSqlInsert"
"org.apache.druid.sql.calcite.parser.DruidSqlParserUtils"
Expand Down
30 changes: 12 additions & 18 deletions sql/src/main/codegen/includes/common.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -17,59 +17,53 @@
* under the License.
*/

// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
org.apache.druid.java.util.common.Pair<Granularity, String> PartitionGranularity() :
SqlNode PartitionGranularity() :
{
SqlNode e;
Granularity granularity;
String unparseString;
SqlNode result;
}
{
(
<HOUR>
{
granularity = Granularities.HOUR;
unparseString = "HOUR";
result = SqlLiteral.createSymbol(GranularityType.HOUR, getPos());
}
|
<DAY>
{
granularity = Granularities.DAY;
unparseString = "DAY";
result = SqlLiteral.createSymbol(GranularityType.DAY, getPos());
}
|
<MONTH>
{
granularity = Granularities.MONTH;
unparseString = "MONTH";
result = SqlLiteral.createSymbol(GranularityType.MONTH, getPos());
}
|
<YEAR>
{
granularity = Granularities.YEAR;
unparseString = "YEAR";
result = SqlLiteral.createSymbol(GranularityType.YEAR, getPos());
}
|
<ALL>
{
granularity = Granularities.ALL;
unparseString = "ALL";
result = SqlLiteral.createSymbol(GranularityType.ALL, getPos());
}
[
<TIME>
{
unparseString += " TIME";
result = SqlLiteral.createSymbol(GranularityType.ALL, getPos());
abhishekrb19 marked this conversation as resolved.
Show resolved Hide resolved
}
]
|
e = Expression(ExprContext.ACCEPT_SUB_QUERY)
{
granularity = DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some validation should happen here; I don't see a nice way to do it but I've found this:

result = new SqlLiteral(DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(e), SYMBOL, getPos());

which might work (but the typeName must be supplied - other candidate could be: UNKNOWN )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added validation here in another way, let me know if ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok - but why not do the conversion here - I think you might also able to do similar thing on all the other branches

result = new SqlLiteral(DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(e), SYMBOL, getPos());

you could shortcut the transient String + Symbol stuff

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new SqlNode type, let me know if good now.

unparseString = e.toString();
// validate
zachjsh marked this conversation as resolved.
Show resolved Hide resolved
DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(e);
result = e;
}
)
{
return new org.apache.druid.java.util.common.Pair(granularity, unparseString);
return result;
}
}

Expand Down
11 changes: 6 additions & 5 deletions sql/src/main/codegen/includes/insert.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,14 @@ SqlNode DruidSqlInsert() :
SqlNode DruidSqlInsertEof() :
{
SqlNode insertNode;
org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
SqlNode partitionedBy = null;
SqlNodeList clusteredBy = null;
}
{
insertNode = DruidSqlInsert()
// PARTITIONED BY is necessary, but is kept optional in the grammar. It is asserted that it is not missing in the
// DruidSqlInsert constructor so that we can return a custom error message.
// PARTITIONED BY is necessary. It can be provided either in this statement or in the catalog.
// As a result, it is optional in the grammar. It is asserted that it is not missing in the
// insert analysis step.
[
<PARTITIONED> <BY>
partitionedBy = PartitionGranularity()
Expand All @@ -93,7 +94,7 @@ SqlNode DruidSqlInsertEof() :
clusteredBy = ClusteredBy()
]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
if (clusteredBy != null && partitionedBy == null) {
throw org.apache.druid.sql.calcite.parser.DruidSqlParserUtils.problemParsing(
"CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause"
);
Expand All @@ -111,6 +112,6 @@ SqlNode DruidSqlInsertEof() :
return insertNode;
}
SqlInsert sqlInsert = (SqlInsert) insertNode;
return new DruidSqlInsert(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, clusteredBy);
return DruidSqlInsert.create(sqlInsert, partitionedBy, clusteredBy);
}
}
7 changes: 3 additions & 4 deletions sql/src/main/codegen/includes/replace.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ SqlNode DruidSqlReplaceEof() :
SqlNodeList columnList = null;
final Span s;
SqlInsert sqlInsert;
// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
SqlNode partitionedBy = null;
SqlNodeList clusteredBy = null;
final Pair<SqlNodeList, SqlNodeList> p;
SqlNode replaceTimeQuery = null;
Expand Down Expand Up @@ -59,7 +58,7 @@ SqlNode DruidSqlReplaceEof() :
clusteredBy = ClusteredBy()
]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
if (clusteredBy != null && partitionedBy == null) {
throw org.apache.druid.sql.calcite.parser.DruidSqlParserUtils.problemParsing(
"CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause"
);
Expand All @@ -72,7 +71,7 @@ SqlNode DruidSqlReplaceEof() :
<EOF>
{
sqlInsert = new SqlInsert(s.end(source), SqlNodeList.EMPTY, table, source, columnList);
return new DruidSqlReplace(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, clusteredBy, replaceTimeQuery);
return DruidSqlReplace.create(sqlInsert, partitionedBy, clusteredBy, replaceTimeQuery);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.calcite.util.ImmutableNullableList;

import javax.annotation.Nullable;
import java.util.List;

/**
* Common base class to the two Druid "ingest" statements: INSERT and REPLACE.
Expand All @@ -34,10 +35,8 @@
*/
public abstract class DruidSqlIngest extends SqlInsert
{
protected final Granularity partitionedBy;

// Used in the unparse function to generate the original query since we convert the string to an enum
protected final String partitionedByStringForUnparse;
@Nullable
protected final SqlNode partitionedBy;
zachjsh marked this conversation as resolved.
Show resolved Hide resolved

@Nullable
protected final SqlNodeList clusteredBy;
Expand All @@ -48,19 +47,18 @@ public DruidSqlIngest(
SqlNode targetTable,
SqlNode source,
SqlNodeList columnList,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nullable SqlNode partitionedBy,
@Nullable SqlNodeList clusteredBy
)
{
super(pos, keywords, targetTable, source, columnList);

this.partitionedByStringForUnparse = partitionedByStringForUnparse;
this.partitionedBy = partitionedBy;
this.clusteredBy = clusteredBy;
}

public Granularity getPartitionedBy()
@Nullable
public SqlNode getPartitionedBy()
{
return partitionedBy;
}
Expand All @@ -70,4 +68,14 @@ public SqlNodeList getClusteredBy()
{
return clusteredBy;
}

@Override
public List<SqlNode> getOperandList()
{
return ImmutableNullableList.<SqlNode>builder()
.addAll(super.getOperandList())
.add(partitionedBy)
.add(clusteredBy)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.calcite.sql.parser.SqlParserPos;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -41,27 +41,40 @@ public class DruidSqlInsert extends DruidSqlIngest
// This allows reusing super.unparse
public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;

/**
* While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is
* disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
* errors when the PARTITIONED BY custom clause is not present, and keeps its error separate from JavaCC/Calcite's
* custom errors which can be cryptic when someone accidentally forgets to explicitly specify the PARTITIONED BY clause
*/
public DruidSqlInsert(
public static DruidSqlInsert create(
@Nonnull SqlInsert insertNode,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nullable SqlNode partitionedBy,
@Nullable SqlNodeList clusteredBy
)
{
super(
return new DruidSqlInsert(
insertNode.getParserPosition(),
(SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
insertNode.getTargetTable(),
insertNode.getSource(),
insertNode.getTargetColumnList(),
partitionedBy,
partitionedByStringForUnparse,
clusteredBy
);
}

public DruidSqlInsert(
SqlParserPos pos,
SqlNodeList keywords,
SqlNode targetTable,
SqlNode source,
SqlNodeList columnList,
@Nullable SqlNode partitionedBy,
@Nullable SqlNodeList clusteredBy
)
{
super(
pos,
keywords,
targetTable,
source,
columnList,
partitionedBy,
clusteredBy
);
}
Expand All @@ -77,8 +90,10 @@ public SqlOperator getOperator()
public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
{
super.unparse(writer, leftPrec, rightPrec);
writer.keyword("PARTITIONED BY");
writer.keyword(partitionedByStringForUnparse);
if (getPartitionedBy() != null) {
writer.keyword("PARTITIONED BY");
writer.keyword(partitionedBy.toString());
zachjsh marked this conversation as resolved.
Show resolved Hide resolved
}
if (getClusteredBy() != null) {
writer.keyword("CLUSTERED BY");
SqlWriter.Frame frame = writer.startList("", "");
Expand Down
Loading
Loading