Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend the PARTITIONED BY clause to accept string literals for the time partitioning #15836

Merged
merged 12 commits into from
Feb 9, 2024
27 changes: 27 additions & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,33 @@ The following ISO 8601 periods are supported for `TIME_FLOOR` and the string con
- P3M
- P1Y

The string constant can also include any of the keywords mentioned above:

- `HOUR` - Same as `'PT1H'`
- `DAY` - Same as `'P1D'`
- `MONTH` - Same as `'P1M'`
- `YEAR` - Same as `'P1Y'`
- `ALL TIME`
- `ALL` - Alias for `ALL TIME`

The `WEEK` granularity is deprecated and not supported in MSQ.

Examples:

```SQL
-- Keyword
PARTITIONED BY HOUR

-- String literal
PARTITIONED BY 'HOUR'

-- ISO 8601 period
PARTITIONED BY 'PT1H'

-- TIME_FLOOR function
PARTITIONED BY TIME_FLOOR(__time, 'PT1H')
```

For more information about partitioning, see [Partitioning](concepts.md#partitioning-by-time). <br /><br />
*Avoid partitioning by week, `P1W`, because weeks don't align neatly with months and years, making it difficult to partition by coarser granularities later.

Expand Down
1 change: 1 addition & 0 deletions sql/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ data: {
"org.apache.calcite.sql.SqlNodeList"
"org.apache.calcite.sql.SqlBasicCall"
"org.apache.druid.java.util.common.granularity.Granularity"
"org.apache.druid.java.util.common.granularity.GranularityType"
"org.apache.druid.java.util.common.granularity.Granularities"
"org.apache.druid.sql.calcite.parser.DruidSqlInsert"
"org.apache.druid.sql.calcite.parser.DruidSqlParserUtils"
Expand Down
17 changes: 8 additions & 9 deletions sql/src/main/codegen/includes/common.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
* under the License.
*/

// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
org.apache.druid.java.util.common.Pair<Granularity, String> PartitionGranularity() :
SqlGranularityLiteral PartitionGranularity() :
{
SqlNode e;
Granularity granularity;
Expand Down Expand Up @@ -52,14 +51,14 @@ org.apache.druid.java.util.common.Pair<Granularity, String> PartitionGranularity
|
<ALL>
{
granularity = Granularities.ALL;
unparseString = "ALL";
granularity = Granularities.ALL;
unparseString = "ALL";
}
[
<TIME>
{
unparseString += " TIME";
}
<TIME>
{
unparseString += " TIME";
}
]
|
e = Expression(ExprContext.ACCEPT_SUB_QUERY)
Expand All @@ -69,7 +68,7 @@ org.apache.druid.java.util.common.Pair<Granularity, String> PartitionGranularity
}
)
{
return new org.apache.druid.java.util.common.Pair(granularity, unparseString);
return new SqlGranularityLiteral(granularity, unparseString, getPos());
}
}

Expand Down
6 changes: 3 additions & 3 deletions sql/src/main/codegen/includes/insert.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ SqlNode DruidSqlInsertEof() :
final SqlNodeList columnList;
final Span s;
final Pair<SqlNodeList, SqlNodeList> p;
org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
SqlGranularityLiteral partitionedBy = null;
SqlNodeList clusteredBy = null;
String exportFileFormat = null;
}
Expand Down Expand Up @@ -93,7 +93,7 @@ SqlNode DruidSqlInsertEof() :
clusteredBy = ClusteredBy()
]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
if (clusteredBy != null && partitionedBy == null) {
throw org.apache.druid.sql.calcite.parser.DruidSqlParserUtils.problemParsing(
"CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause"
);
Expand All @@ -112,6 +112,6 @@ SqlNode DruidSqlInsertEof() :
return insertNode;
}
SqlInsert sqlInsert = (SqlInsert) insertNode;
return new DruidSqlInsert(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, clusteredBy, exportFileFormat);
return DruidSqlInsert.create(sqlInsert, partitionedBy, clusteredBy, exportFileFormat);
}
}
7 changes: 3 additions & 4 deletions sql/src/main/codegen/includes/replace.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ SqlNode DruidSqlReplaceEof() :
final Span s;
SqlNode tableRef = null;
SqlInsert sqlInsert;
// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
SqlGranularityLiteral partitionedBy = null;
SqlNodeList clusteredBy = null;
final Pair<SqlNodeList, SqlNodeList> p;
SqlNode replaceTimeQuery = null;
Expand Down Expand Up @@ -78,7 +77,7 @@ SqlNode DruidSqlReplaceEof() :
clusteredBy = ClusteredBy()
]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
if (clusteredBy != null && partitionedBy == null) {
throw org.apache.druid.sql.calcite.parser.DruidSqlParserUtils.problemParsing(
"CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause"
);
Expand All @@ -91,7 +90,7 @@ SqlNode DruidSqlReplaceEof() :
<EOF>
{
sqlInsert = new SqlInsert(s.end(source), SqlNodeList.EMPTY, destination, source, columnList);
return new DruidSqlReplace(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, clusteredBy, replaceTimeQuery, exportFileFormat);
return DruidSqlReplace.create(sqlInsert, partitionedBy, clusteredBy, replaceTimeQuery, exportFileFormat);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.calcite.util.ImmutableNullableList;

import javax.annotation.Nullable;
import java.util.List;

/**
* Common base class to the two Druid "ingest" statements: INSERT and REPLACE.
Expand All @@ -37,10 +38,7 @@ public abstract class DruidSqlIngest extends SqlInsert
public static final String SQL_EXPORT_FILE_FORMAT = "__exportFileFormat";

@Nullable
protected final Granularity partitionedBy;

// Used in the unparse function to generate the original query since we convert the string to an enum
protected final String partitionedByStringForUnparse;
protected final SqlGranularityLiteral partitionedBy;

@Nullable
protected final SqlNodeList clusteredBy;
Expand All @@ -53,22 +51,20 @@ public DruidSqlIngest(
SqlNode targetTable,
SqlNode source,
SqlNodeList columnList,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nullable SqlGranularityLiteral partitionedBy,
@Nullable SqlNodeList clusteredBy,
@Nullable String exportFileFormat
)
{
super(pos, keywords, targetTable, source, columnList);

this.partitionedByStringForUnparse = partitionedByStringForUnparse;
this.partitionedBy = partitionedBy;
this.clusteredBy = clusteredBy;
this.exportFileFormat = exportFileFormat;
}

@Nullable
public Granularity getPartitionedBy()
public SqlGranularityLiteral getPartitionedBy()
{
return partitionedBy;
}
Expand All @@ -84,4 +80,14 @@ public String getExportFileFormat()
{
return exportFileFormat;
}

@Override
public List<SqlNode> getOperandList()
{
return ImmutableNullableList.<SqlNode>builder()
.addAll(super.getOperandList())
.add(partitionedBy)
.add(clusteredBy)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.calcite.sql.parser.SqlParserPos;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -41,28 +41,49 @@ public class DruidSqlInsert extends DruidSqlIngest
// This allows reusing super.unparse
public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;

public static DruidSqlInsert create(
@Nonnull SqlInsert insertNode,
@Nullable SqlGranularityLiteral partitionedBy,
@Nullable SqlNodeList clusteredBy,
@Nullable String exportFileFormat
)
{
return new DruidSqlInsert(
insertNode.getParserPosition(),
(SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
insertNode.getTargetTable(),
insertNode.getSource(),
insertNode.getTargetColumnList(),
partitionedBy,
clusteredBy,
exportFileFormat
);
}

/**
* While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is
* While partitionedBy can be null as arguments to the constructor, this is
* disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
* errors when the PARTITIONED BY custom clause is not present, and keeps its error separate from JavaCC/Calcite's
* custom errors which can be cryptic when someone accidentally forgets to explicitly specify the PARTITIONED BY clause
*/
public DruidSqlInsert(
@Nonnull SqlInsert insertNode,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
SqlParserPos pos,
SqlNodeList keywords,
SqlNode targetTable,
SqlNode source,
SqlNodeList columnList,
@Nullable SqlGranularityLiteral partitionedBy,
@Nullable SqlNodeList clusteredBy,
@Nullable String exportFileFormat
)
{
super(
insertNode.getParserPosition(),
(SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
insertNode.getTargetTable(),
insertNode.getSource(),
insertNode.getTargetColumnList(),
pos,
keywords,
targetTable,
source,
columnList,
partitionedBy,
partitionedByStringForUnparse,
clusteredBy,
exportFileFormat
);
Expand Down Expand Up @@ -95,9 +116,9 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
getSource().unparse(writer, 0, 0);
writer.newlineAndIndent();

if (partitionedByStringForUnparse != null) {
if (getPartitionedBy() != null) {
writer.keyword("PARTITIONED BY");
writer.keyword(partitionedByStringForUnparse);
getPartitionedBy().unparse(writer, leftPrec, rightPrec);
}

if (getClusteredBy() != null) {
Expand Down
Loading
Loading