diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java index 1aa7d5b805487..b002c45b3a5aa 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java @@ -178,35 +178,37 @@ * Default implementation of {@link SqlValidator}, the class was copied over because of * CALCITE-4554. * - *

Lines 228 ~ 231, Flink improves error message for functions without appropriate arguments in + *

Lines 230 ~ 233, Flink improves error message for functions without appropriate arguments in * handleUnresolvedFunction. * - *

Lines 1317 ~ 1319, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0. + *

Lines 1319 ~ 1321, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0. * - *

Lines 2078 ~ 2092, Flink improves error message for functions without appropriate arguments in + *

Lines 2080 ~ 2094, Flink improves error message for functions without appropriate arguments in * handleUnresolvedFunction at {@link SqlValidatorImpl#handleUnresolvedFunction}. * - *

Lines 2505 ~ 2507, CALCITE-7471 should be removed after upgrading Calcite to 1.42.0. + *

Lines 2507 ~ 2509, CALCITE-7471 should be removed after upgrading Calcite to 1.42.0. * - *

Lines 2620 ~ 2639, CALCITE-7217, CALCITE-7312 should be removed after upgrading Calcite to + *

Lines 2622 ~ 2641, CALCITE-7217, CALCITE-7312 should be removed after upgrading Calcite to * 1.42.0. * - *

Line 2670 ~2688, set the correct scope for VECTOR_SEARCH. + *

Line 2672 ~2690, set the correct scope for VECTOR_SEARCH. * - *

Lines 4070 ~ 4074, 6758 ~ 6764 Flink improves Optimize the retrieval of sub-operands in + *

Lines 4072 ~ 4076, 6766 ~ 6772 Flink improves Optimize the retrieval of sub-operands in * SqlCall when using NamedParameters at {@link SqlValidatorImpl#checkRollUp}. * - *

Lines 5490 ~ 5496, FLINK-24352 Add null check for temporal table check on SqlSnapshot. + *

Lines 5492 ~ 5498, FLINK-24352 Add null check for temporal table check on SqlSnapshot. * - *

Lines 5930-5932, CALCITE-7466 should be removed after upgrading Calcite to 1.42.0. + *

Lines 5913-5928, CALCITE-7538 should be removed after upgrading Calcite to 1.42.0. * - *

Lines 5986-5988, CALCITE-7470 should be removed after upgrading Calcite to 1.42.0. + *

Lines 5938-5940, CALCITE-7466 should be removed after upgrading Calcite to 1.42.0. * - *

Lines 7414-7437, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0. + *

Lines 5994-5996, CALCITE-7470 should be removed after upgrading Calcite to 1.42.0. * - *

Lines 7484-7501, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0. + *

Lines 7422-7445, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0. * - *

Lines 7546-7554, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0. + *

Lines 7492-7509, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0. + * + *

Lines 7554-7562, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0. */ public class SqlValidatorImpl implements SqlValidatorWithHints { // ~ Static fields/initializers --------------------------------------------- @@ -5908,7 +5910,8 @@ public void validateMatchRecognize(SqlCall call) { private PairList validateMeasure( SqlMatchRecognize mr, MatchRecognizeScope scope, boolean allRows) { - final List aliases = new ArrayList<>(); + // FLINK MODIFICATION BEGIN + final Set aliases = new HashSet<>(); final List sqlNodes = new ArrayList<>(); final SqlNodeList measures = mr.getMeasureList(); final PairList fields = PairList.of(); @@ -5916,8 +5919,13 @@ private PairList validateMeasure( for (SqlNode measure : measures) { assert measure instanceof SqlCall; final String alias = SqlValidatorUtil.alias(measure, aliases.size()); - aliases.add(alias); - + if (!aliases.add(alias)) { + throw new CalciteException( + String.format( + "Duplicate name '%s' in MATCH_RECOGNIZE MEASURE alias list", alias), + null); + } + // FLINK MODIFICATION END SqlNode expand = expand(measure, scope); expand = navigationInMeasure(expand, allRows); setOriginal(expand, measure); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java index c0ca9fbf975c7..e3060e4f490fb 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java @@ -249,6 +249,25 @@ void testValidatingAmbiguousColumns() { .withMessageContaining("Columns ambiguously defined: {symbol, price}"); } + @TestTemplate + void testValidatingDuplicateMeasure() { + String sqlQuery = + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " MEASURES\n" + + " A.symbol AS col,\n" + + " A.price AS col\n" + + " PATTERN (A)\n" + + " DEFINE\n" + + " A AS A.symbol = 'a'\n" + + ") AS T"; + assertThatExceptionOfType(ValidationException.class) + .isThrownBy(() -> tEnv.executeSql(sqlQuery)) + .withMessageContaining( + "SQL validation failed. Duplicate name 'col' in MATCH_RECOGNIZE MEASURE alias list"); + } + // *************************************************************************************** // * Those validations are temporary. We should remove those tests once we support those * // * features. *