From d8a8b31f285588264fc2a511b93986c7a47f430d Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Thu, 20 Nov 2025 09:45:19 +0800 Subject: [PATCH 1/2] [FLINK-38709][table][python] Fix ScalarFunctionSplitter to allow PythonFunction & AsyncFunction work when taking the recursive field of composite type as input --- .../rules/logical/RemoteCalcSplitRule.scala | 15 +- .../rules/logical/AsyncCalcSplitRuleTest.java | 10 +- .../logical/AsyncCorrelateSplitRuleTest.java | 9 +- .../rules/logical/AsyncCalcSplitRuleTest.xml | 166 ++++++++++-------- .../logical/AsyncCorrelateSplitRuleTest.xml | 83 +++++---- 5 files changed, 175 insertions(+), 108 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala index ff60809cea680..26baa44f43d82 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala @@ -434,6 +434,8 @@ class ScalarFunctionSplitter( private var fieldsRexCall: Map[Int, Int] = Map[Int, Int]() + private val extractedRexNodeRefs: mutable.HashSet[RexNode] = mutable.HashSet[RexNode]() + override def visitCall(call: RexCall): RexNode = { if (needConvert(call)) { getExtractedRexNode(call) @@ -454,7 +456,9 @@ class ScalarFunctionSplitter( new RexInputRef(field.getIndex, field.getType) case _ => val newFieldAccess = - rexBuilder.makeFieldAccess(expr.accept(this), fieldAccess.getField.getIndex) + rexBuilder.makeFieldAccess( + convertInputRefToLocalRefIfNecessary(expr.accept(this)), + fieldAccess.getField.getIndex) getExtractedRexNode(newFieldAccess) } } else { @@ -468,9 +472,18 @@ class ScalarFunctionSplitter( override def visitNode(rexNode: RexNode): RexNode = rexNode + private def convertInputRefToLocalRefIfNecessary(node: RexNode): RexNode = { + node match { + case inputRef: RexInputRef if extractedRexNodeRefs.contains(node) => + new RexLocalRef(inputRef.getIndex, node.getType) + case _ => node + } + } + private def getExtractedRexNode(node: RexNode): RexNode = { val newNode = new RexInputRef(extractedFunctionOffset + extractedRexNodes.length, node.getType) extractedRexNodes.append(node) + extractedRexNodeRefs.add(newNode) newNode } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java index 444c17fee7e49..e566829229a2e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java @@ -62,7 +62,8 @@ public void setup() { + " a int,\n" + " b bigint,\n" + " c string,\n" - + " d ARRAY\n" + + " d ARRAY,\n" + + " e ROW, g string>" + ") WITH (\n" + " 'connector' = 'test-simple-table-source'\n" + ") ;"); @@ -89,6 +90,7 @@ public void setup() { @Test public void testSingleCall() { String sqlQuery = "SELECT func1(a) FROM MyTable"; + util.getTableEnv().explainSql(sqlQuery); util.verifyRelPlan(sqlQuery); } @@ -182,6 +184,12 @@ public void testFieldAccessAfter() { util.verifyRelPlan(sqlQuery); } + @Test + public void testCompositeFieldAsInput() { + String sqlQuery = "SELECT func1(e.f.h) from MyTable"; + util.verifyRelPlan(sqlQuery); + } + @Test public void testFieldOperand() { String sqlQuery = "SELECT func1(func5(a).f0) from MyTable"; diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java index b98403a28466a..862aedb421b24 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java @@ -62,7 +62,8 @@ public void setup() { + " a int,\n" + " b bigint,\n" + " c string,\n" - + " d ARRAY\n" + + " d ARRAY,\n" + + " e ROW, g string>\n" + ") WITH (\n" + " 'connector' = 'test-simple-table-source'\n" + ") ;"); @@ -110,6 +111,12 @@ public void testCorrelateWithCast() { util.verifyRelPlan(sqlQuery); } + @Test + public void testCorrelateWithCompositeFieldAsInput() { + String sqlQuery = "select * FROM MyTable, LATERAL TABLE(asyncTableFunc(e.f.h))"; + util.verifyRelPlan(sqlQuery); + } + /** Test function. */ public static class AsyncFunc extends AsyncTableFunction { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml index fa4f17124edbc..726b02847c8a5 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml @@ -34,7 +34,25 @@ AsyncCalc(select=[a, func3($f1) AS EXPR$1]) +- GroupAggregate(groupBy=[a], select=[a, COUNT(*) AS $f1]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) +]]> + + + + + + + + + + + @@ -52,7 +70,7 @@ LogicalProject(EXPR$0=[func5($0).f0]) @@ -71,7 +89,7 @@ LogicalProject(EXPR$0=[func1(func5($0).f0)]) AsyncCalc(select=[func1(f0) AS EXPR$0]) +- Calc(select=[f0.f0 AS f0]) +- AsyncCalc(select=[func5(a) AS f0]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) ]]> @@ -88,7 +106,7 @@ LogicalProject(a=[$0], EXPR$1=[func1($0)]) @@ -107,7 +125,7 @@ LogicalProject(EXPR$0=[func1($0)], EXPR$1=[func2($0)], EXPR$2=[func1($0)], EXPR$ Calc(select=[f0 AS EXPR$0, f00 AS EXPR$1, f0 AS EXPR$2, f00 AS EXPR$3]) +- AsyncCalc(select=[f0, func2(a) AS f00]) +- AsyncCalc(select=[a, func1(a) AS f0]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) ]]> @@ -118,9 +136,9 @@ Calc(select=[f0 AS EXPR$0, f00 AS EXPR$1, f0 AS EXPR$2, f00 AS EXPR$3]) (func6($0, $4), 10))], joinType=[inner]) ++- LogicalJoin(condition=[AND(=($0, $5), >(func6($0, $5), 10))], joinType=[inner]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) ]]> @@ -159,7 +177,7 @@ Calc(select=[a], where=[>(f0, 10)]) +- Join(joinType=[InnerJoin], where=[=(a, a2)], select=[a, a2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a2, b2, c2, d2]) @@ -174,7 +192,7 @@ Calc(select=[a], where=[>(f0, 10)]) @@ -186,7 +204,7 @@ Calc(select=[a]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a], where=[REGEXP(f0, 'val (2|3)')]) : +- AsyncCalc(select=[a, func2(a) AS f0]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2], where=[REGEXP(f0, 'val (2|3)')]) +- AsyncCalc(select=[a2, func2(a2) AS f0]) @@ -201,8 +219,8 @@ Calc(select=[a]) (func6($0, $4), 10)]) - +- LogicalJoin(condition=[=($0, $4)], joinType=[inner]) ++- LogicalFilter(condition=[>(func6($0, $5), 10)]) + +- LogicalJoin(condition=[=($0, $5)], joinType=[inner]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) ]]> @@ -214,7 +232,7 @@ Calc(select=[a], where=[>(f0, 10)]) +- Join(joinType=[InnerJoin], where=[=(a, a2)], select=[a, a2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a2, b2, c2, d2]) @@ -228,7 +246,7 @@ Calc(select=[a], where=[>(f0, 10)]) @@ -239,7 +257,7 @@ AsyncCalc(select=[func1(a) AS EXPR$0]) +- Join(joinType=[InnerJoin], where=[=(a, a2)], select=[a, a2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a2, b2, c2, d2]) @@ -270,8 +288,8 @@ AsyncCalc(select=[func1(1) AS EXPR$0]) (func6($0, $4), 10))]) - +- LogicalJoin(condition=[=($0, $4)], joinType=[left]) ++- LogicalFilter(condition=[AND(=($0, $5), >(func6($0, $5), 10))]) + +- LogicalJoin(condition=[=($0, $5)], joinType=[left]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) ]]> @@ -283,7 +301,7 @@ Calc(select=[a], where=[>(f0, 10)]) +- Join(joinType=[InnerJoin], where=[=(a, a2)], select=[a, a2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a2, b2, c2, d2]) @@ -296,10 +314,10 @@ Calc(select=[a], where=[>(f0, 10)]) @@ -337,7 +355,7 @@ Join(joinType=[LeftOuterJoin], where=[=(a, a2)], select=[a, a2], leftInputSpec=[ :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a], where=[REGEXP(f0, 'string (2|3)')]) : +- AsyncCalc(select=[a, func2(a) AS f0]) -: +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a2, b2, c2, d2]) @@ -351,8 +369,8 @@ Join(joinType=[LeftOuterJoin], where=[=(a, a2)], select=[a, a2], leftInputSpec=[ (func6($0, $4), 10)]) - +- LogicalJoin(condition=[=($0, $4)], joinType=[left]) ++- LogicalFilter(condition=[>(func6($0, $5), 10)]) + +- LogicalJoin(condition=[=($0, $5)], joinType=[left]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) ]]> @@ -364,7 +382,7 @@ Calc(select=[a], where=[>(f0, 10)]) +- Join(joinType=[LeftOuterJoin], where=[=(a, a2)], select=[a, a2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a2, b2, c2, d2]) @@ -378,7 +396,7 @@ Calc(select=[a], where=[>(f0, 10)]) (func1($4), 10))], joinType=[left]) ++- LogicalJoin(condition=[AND(=($0, $5), >(func1($5), 10))], joinType=[left]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) ]]> @@ -389,7 +407,7 @@ Calc(select=[a]) +- Join(joinType=[LeftOuterJoin], where=[=(a, a2)], select=[a, a2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2], where=[>(f0, 10)]) +- AsyncCalc(select=[a2, func1(a2) AS f0]) @@ -404,8 +422,8 @@ Calc(select=[a]) (func1($4), 10)]) - +- LogicalJoin(condition=[=($0, $4)], joinType=[left]) ++- LogicalFilter(condition=[>(func1($5), 10)]) + +- LogicalJoin(condition=[=($0, $5)], joinType=[left]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) ]]> @@ -417,7 +435,7 @@ Calc(select=[a], where=[>(f0, 10)]) +- Join(joinType=[LeftOuterJoin], where=[=(a, a2)], select=[a, a2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a2, b2, c2, d2]) @@ -438,7 +456,7 @@ LogicalProject(EXPR$0=[_UTF-16LE'foo'], EXPR$1=[func1($0)]) @@ -457,7 +475,7 @@ LogicalProject(EXPR$0=[func1(func1(func1($0)))]) AsyncCalc(select=[func1(f0) AS EXPR$0]) +- AsyncCalc(select=[func1(f0) AS f0]) +- AsyncCalc(select=[func1(a) AS f0]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) ]]> @@ -493,7 +511,7 @@ LogicalProject(EXPR$0=[CONCAT(func2($0), _UTF-16LE'foo')]) @@ -513,7 +531,7 @@ LogicalProject(blah=[$0]) @@ -524,8 +542,8 @@ Calc(select=[f0 AS blah], where=[REGEXP(f0, 'string (2|3)')]) (func6($0, $4), 10))]) - +- LogicalJoin(condition=[=($0, $4)], joinType=[right]) ++- LogicalFilter(condition=[AND(=($0, $5), >(func6($0, $5), 10))]) + +- LogicalJoin(condition=[=($0, $5)], joinType=[right]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) ]]> @@ -537,7 +555,7 @@ Calc(select=[a], where=[>(f0, 10)]) +- Join(joinType=[InnerJoin], where=[=(a, a2)], select=[a, a2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a2, b2, c2, d2]) @@ -550,10 +568,10 @@ Calc(select=[a], where=[>(f0, 10)]) (func6($0, $4), 10)]) - +- LogicalJoin(condition=[=($0, $4)], joinType=[right]) ++- LogicalFilter(condition=[>(func6($0, $5), 10)]) + +- LogicalJoin(condition=[=($0, $5)], joinType=[right]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) ]]> @@ -592,7 +610,7 @@ Calc(select=[a], where=[>(f0, 10)]) +- Join(joinType=[RightOuterJoin], where=[=(a, a2)], select=[a, a2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a2, b2, c2, d2]) @@ -606,7 +624,7 @@ Calc(select=[a], where=[>(f0, 10)]) (func1($0), 10))], joinType=[right]) ++- LogicalJoin(condition=[AND(=($0, $5), >(func1($0), 10))], joinType=[right]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) ]]> @@ -618,7 +636,7 @@ Calc(select=[a]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a], where=[>(f0, 10)]) : +- AsyncCalc(select=[a, func1(a) AS f0]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a2, b2, c2, d2]) @@ -633,7 +651,7 @@ Calc(select=[a]) (func1($0), 10)]) - +- LogicalJoin(condition=[=($0, $4)], joinType=[right]) + +- LogicalJoin(condition=[=($0, $5)], joinType=[right]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) ]]> @@ -645,7 +663,7 @@ Calc(select=[a], where=[>(f0, 10)]) +- Join(joinType=[RightOuterJoin], where=[=(a, a2)], select=[a, a2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a2, b2, c2, d2]) @@ -665,7 +683,7 @@ LogicalProject(EXPR$0=[func4($3)]) @@ -682,7 +700,7 @@ LogicalProject(EXPR$0=[func1($0)]) @@ -703,7 +721,7 @@ AsyncCalc(select=[f0 AS EXPR$0, func1(f1) AS EXPR$1, f2 AS EXPR$2]) +- AsyncCalc(select=[f2, f1, func1(f0) AS f0]) +- Calc(select=[f0, f0 AS f1, f0 AS f2]) +- AsyncCalc(select=[func1(a) AS f0]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) ]]> @@ -721,7 +739,7 @@ LogicalProject(EXPR$0=[func1($0)], EXPR$1=[func1($0)]) @@ -740,7 +758,7 @@ LogicalProject(a=[$0]) @@ -759,7 +777,7 @@ LogicalProject(EXPR$0=[func2($0)]) @@ -779,7 +797,7 @@ LogicalProject(blah=[$0]) @@ -798,7 +816,7 @@ LogicalProject(a=[$0]) =(f0, 12)]) +- AsyncCalc(select=[a, func1(a) AS f0]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) ]]> @@ -822,7 +840,7 @@ AsyncCalc(select=[func1(a) AS EXPR$0]) +- Join(joinType=[LeftAntiJoin], where=[OR(IS NULL(a), IS NULL(a2), =(a, a2))], select=[a], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[single]) : +- Calc(select=[a]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e]) +- Exchange(distribution=[single]) +- Calc(select=[a2]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a2, b2, c2, d2]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.xml index dcbe743d2c2c7..2cdbfbaf162f8 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.xml @@ -22,7 +22,7 @@ limitations under the License. @@ -43,7 +43,7 @@ Calc(select=[a, b, c, d, EXPR$0]) @@ -64,7 +64,7 @@ Calc(select=[a, b, c, d, EXPR$0]) @@ -86,7 +86,7 @@ Calc(select=[a, b, c, d, EXPR$0]) + + + + + + + + + + + @@ -107,7 +128,7 @@ Calc(select=[a, b, c, d, EXPR$0]) @@ -128,7 +149,7 @@ Calc(select=[a, b, c, d, EXPR$0]) From fd941b58fad61e6c61927060435a71144b563b34 Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Mon, 24 Nov 2025 14:44:58 +0800 Subject: [PATCH 2/2] minor: remove unnecessary change --- .../table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java index e566829229a2e..53ba394f5f07d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java @@ -90,7 +90,6 @@ public void setup() { @Test public void testSingleCall() { String sqlQuery = "SELECT func1(a) FROM MyTable"; - util.getTableEnv().explainSql(sqlQuery); util.verifyRelPlan(sqlQuery); }