Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
Original file line number Diff line number Diff line change
Expand Up @@ -2202,15 +2202,21 @@ SqlNode TableRef3(ExprContext exprContext, boolean lateral) :
[ tableRef = ExtendTable(tableRef) ]
tableRef = Over(tableRef)
[ tableRef = Snapshot(tableRef) ]
[ tableRef = MatchRecognize(tableRef) ]
[
LOOKAHEAD(3)
tableRef = MatchRecognize(tableRef)
]
)
|
LOOKAHEAD(2)
[ <LATERAL> { lateral = true; } ]
tableRef = ParenthesizedExpression(exprContext)
tableRef = Over(tableRef)
tableRef = addLateral(tableRef, lateral)
[ tableRef = MatchRecognize(tableRef) ]
[
LOOKAHEAD(3)
tableRef = MatchRecognize(tableRef)
]
|
LOOKAHEAD(2)
[ <LATERAL> ] // "LATERAL" is implicit with "UNNEST", so ignore
Expand Down Expand Up @@ -3059,6 +3065,7 @@ void AddUnpivotValue(List<SqlNode> list) :
SqlMatchRecognize MatchRecognize(SqlNode tableRef) :
{
final Span s, s0, s1, s2;
final SqlIdentifier aliasBeforeMatch;
final SqlNodeList measureList;
final SqlNodeList partitionList;
final SqlNodeList orderList;
Expand All @@ -3073,6 +3080,12 @@ SqlMatchRecognize MatchRecognize(SqlNode tableRef) :
final SqlLiteral isStrictEnds;
}
{
[
<AS> aliasBeforeMatch = SimpleIdentifier() {
tableRef = SqlStdOperatorTable.AS.createCall(
Span.of(tableRef).end(this), tableRef, aliasBeforeMatch);
}
]
<MATCH_RECOGNIZE> { s = span(); checkNotJoin(tableRef); } <LPAREN>
(
<PARTITION> { s2 = span(); } <BY>
Expand Down Expand Up @@ -7209,7 +7222,7 @@ SqlCall MatchRecognizeCallWithModifier() :
{
final Span s;
final SqlOperator runningOp;
final SqlNode func;
final SqlNode e;
}
{
(
Expand All @@ -7218,8 +7231,8 @@ SqlCall MatchRecognizeCallWithModifier() :
<FINAL> { runningOp = SqlStdOperatorTable.FINAL; }
)
{ s = span(); }
func = NamedFunctionCall() {
return runningOp.createCall(s.end(func), func);
e = Expression3(ExprContext.ACCEPT_NON_QUERY) {
return runningOp.createCall(s.end(e), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,22 +169,35 @@
* Default implementation of {@link SqlValidator}, the class was copied over because of
* CALCITE-4554.
*
* <p>Lines 202 ~ 205, Flink improves error message for functions without appropriate arguments in
* <p>Lines 219 ~ 222, Flink improves error message for functions without appropriate arguments in
* handleUnresolvedFunction.
*
* <p>Lines 1270 ~ 1272, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0.
* <p>Lines 1287 ~ 1289, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0.
*
* <p>Lines 2031 ~ 2045, Flink improves error message for functions without appropriate arguments in
* <p>Lines 2048 ~ 2062, Flink improves error message for functions without appropriate arguments in
* handleUnresolvedFunction at {@link SqlValidatorImpl#handleUnresolvedFunction}.
*
* <p>Lines 2571 ~ 2588, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0.
* <p>Lines 2475 ~ 2477, CALCITE-7471 should be removed after upgrading Calcite to 1.42.0.
*
* <p>Line 2618 ~2631, set the correct scope for VECTOR_SEARCH.
* <p>Lines 2590 ~ 2609, CALCITE-7217, CALCITE-7312 should be removed after upgrading Calcite to
* 1.42.0.
*
* <p>Lines 3920 ~ 3925, 6599 ~ 6606 Flink improves Optimize the retrieval of sub-operands in
* <p>Line 2640 ~2658, set the correct scope for VECTOR_SEARCH.
*
* <p>Lines 3937 ~ 3941, 6612 ~ 6618 Flink improves Optimize the retrieval of sub-operands in
* SqlCall when using NamedParameters at {@link SqlValidatorImpl#checkRollUp}.
*
* <p>Lines 5340 ~ 5347, FLINK-24352 Add null check for temporal table check on SqlSnapshot.
* <p>Lines 5357 ~ 5363, FLINK-24352 Add null check for temporal table check on SqlSnapshot.
*
* <p>Lines 5782-5784, CALCITE-7466 should be removed after upgrading Calcite to 1.42.0.
*
* <p>Lines 5838-5840, CALCITE-7470 should be removed after upgrading Calcite to 1.42.0.
*
* <p>Lines 7267-7290, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0.
*
* <p>Lines 7335-7352, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0.
*
* <p>Lines 7397-7405, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0.
*/
public class SqlValidatorImpl implements SqlValidatorWithHints {
// ~ Static fields/initializers ---------------------------------------------
Expand Down Expand Up @@ -2459,7 +2472,9 @@ private SqlNode registerFrom(
enclosingNode,
alias,
forceNullable);
return node;
// ----- FLINK MODIFICATION BEGIN -----
return newNode;
// ----- FLINK MODIFICATION END -----

case PIVOT:
registerPivot(
Expand Down Expand Up @@ -5764,11 +5779,9 @@ private PairList<String, RelDataType> validateMeasure(
setValidatedNodeType(measure, type);

fields.add(alias, type);
sqlNodes.add(
SqlStdOperatorTable.AS.createCall(
SqlParserPos.ZERO,
expand,
new SqlIdentifier(alias, SqlParserPos.ZERO)));
// ----- FLINK MODIFICATION BEGIN -----
sqlNodes.add(expand);
// ----- FLINK MODIFICATION END -----
}

SqlNodeList list = new SqlNodeList(sqlNodes, measures.getParserPosition());
Expand Down Expand Up @@ -5822,11 +5835,9 @@ private void validateDefinitions(SqlMatchRecognize mr, MatchRecognizeScope scope

// Some extra work need required here.
// In PREV, NEXT, FINAL and LAST, only one pattern variable is allowed.
sqlNodes.add(
SqlStdOperatorTable.AS.createCall(
SqlParserPos.ZERO,
expand,
new SqlIdentifier(alias, SqlParserPos.ZERO)));
// ----- FLINK MODIFICATION BEGIN -----
sqlNodes.add(expand);
// ----- FLINK MODIFICATION END -----

final RelDataType type = deriveType(scope, expand);
if (!SqlTypeUtil.inBooleanFamily(type)) {
Expand Down Expand Up @@ -7251,19 +7262,31 @@ private class PatternValidator extends SqlBasicVisitor<@Nullable Set<String>> {
int firstLastCount;
int prevNextCount;
int aggregateCount;
// ----- FLINK MODIFICATION BEGIN -----
int index;
int argCount;

PatternValidator(boolean isMeasure) {
this(isMeasure, 0, 0, 0);
this(isMeasure, 0, 0, 0, 0, 0);
}

PatternValidator(
boolean isMeasure, int firstLastCount, int prevNextCount, int aggregateCount) {
boolean isMeasure,
int firstLastCount,
int prevNextCount,
int aggregateCount,
int index,
int argCount) {
this.isMeasure = isMeasure;
this.firstLastCount = firstLastCount;
this.prevNextCount = prevNextCount;
this.aggregateCount = aggregateCount;
this.index = index;
this.argCount = argCount;
}

// ----- FLINK MODIFICATION END -----

@Override
public Set<String> visit(SqlCall call) {
boolean isSingle = false;
Expand Down Expand Up @@ -7309,7 +7332,9 @@ public Set<String> visit(SqlCall call) {
call, Static.RESOURCE.patternRunningFunctionInDefine(call.toString()));
}

for (SqlNode node : operands) {
// ----- FLINK MODIFICATION BEGIN -----
for (int i = 0; i < operands.size(); i++) {
SqlNode node = operands.get(i);
if (node != null) {
vars.addAll(
requireNonNull(
Expand All @@ -7318,10 +7343,13 @@ public Set<String> visit(SqlCall call) {
isMeasure,
firstLastCount,
prevNextCount,
aggregateCount)),
aggregateCount,
i,
operands.size())),
() -> "node.accept(PatternValidator) for node " + node));
}
}
// ----- FLINK MODIFICATION END -----

if (isSingle) {
switch (kind) {
Expand Down Expand Up @@ -7366,7 +7394,15 @@ public Set<String> visit(SqlIdentifier identifier) {

@Override
public Set<String> visit(SqlLiteral literal) {
return ImmutableSet.of();
// ----- FLINK MODIFICATION BEGIN -----
if ((this.argCount == 1 || this.index < this.argCount - 1)
&& (this.firstLastCount > 0 || this.prevNextCount > 0)
&& !SqlUtil.isNull(literal)) {
return ImmutableSet.of(literal.toValue());
} else {
return ImmutableSet.of();
}
// ----- FLINK MODIFICATION END -----
}

@Override
Expand Down
Loading