diff --git a/docs/content.zh/docs/concepts/sql-table-concepts/overview.md b/docs/content.zh/docs/concepts/sql-table-concepts/overview.md index 85c95dedddbc3..3023b65e2c7a6 100644 --- a/docs/content.zh/docs/concepts/sql-table-concepts/overview.md +++ b/docs/content.zh/docs/concepts/sql-table-concepts/overview.md @@ -260,8 +260,12 @@ Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO enriched_ord - SQL 语法 ```sql - COMPILE PLAN [IF NOT EXISTS] FOR |; - + -- 将编译后的计划写入文件: + COMPILE PLAN '' [IF NOT EXISTS] FOR |; + + -- 或将编译后的计划作为一行一列的 STRING 结果集内联返回: + COMPILE PLAN FOR |; + statement_set: EXECUTE STATEMENT SET BEGIN @@ -269,11 +273,11 @@ Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO enriched_ord ... insert_statement; END; - + insert_statement: | ``` - 该语句会在指定位置 `/path/to/plan.json` 生成一个 JSON 文件。 + 文件形式会在 `` 生成 JSON 文件。`IF NOT EXISTS` 仅在文件形式下有效,用于在文件已存在时跳过编译。内联形式返回相同的 JSON 作为结果集,适用于与执行器不共享文件系统的 SQL Gateway / SQL Client / JDBC 客户端。 {{< hint info >}} `COMPILE PLAN` 语句支持写入 `hdfs://` 或 `s3://` 等 Flink 支持的[文件系统]({{< ref "docs/deployment/filesystems/overview" >}})。 diff --git a/docs/content/docs/concepts/sql-table-concepts/overview.md b/docs/content/docs/concepts/sql-table-concepts/overview.md index f9c5b4577ae1b..a15ea4eddaf8a 100644 --- a/docs/content/docs/concepts/sql-table-concepts/overview.md +++ b/docs/content/docs/concepts/sql-table-concepts/overview.md @@ -278,8 +278,12 @@ Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO enriched_ord - SQL Syntax ```sql - COMPILE PLAN [IF NOT EXISTS] FOR |; - + -- Write the compiled plan to a file: + COMPILE PLAN '' [IF NOT EXISTS] FOR |; + + -- Or return the compiled plan inline as a single-row, single-column STRING result set: + COMPILE PLAN FOR |; + statement_set: EXECUTE STATEMENT SET BEGIN @@ -287,11 +291,11 @@ Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO enriched_ord ... insert_statement; END; - + insert_statement: | ``` - This will generate a JSON file at `/path/to/plan.json`. + The file form writes a JSON file at ``. `IF NOT EXISTS` skips compilation when the file already exists and is only valid in the file form. The inline form returns the same JSON as a result set, which is convenient for SQL Gateway / SQL Client / JDBC clients that do not share a filesystem with the executor. {{< hint info >}} `COMPILE PLAN` statement supports writing the plan to a remote [filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) scheme like `hdfs://` or `s3://`. diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java index d64ed8f35f8c4..50696f4e63f9a 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java @@ -465,6 +465,40 @@ void testGetOperationSchemaUntilOperationIsReady() throws Exception { task -> assertThat(task.get()).isEqualTo(getDefaultResultSet().getResultSchema())); } + @Test + void testCompilePlanInline() throws Exception { + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration()); + + String sourceDdl = "CREATE TABLE src (a INT, b STRING) WITH ('connector' = 'datagen')"; + String sinkDdl = "CREATE TABLE snk (a INT, b STRING) WITH ('connector' = 'blackhole')"; + String compileSql = "COMPILE PLAN FOR INSERT INTO snk SELECT * FROM src"; + + awaitOperationTermination( + service, + sessionHandle, + service.executeStatement(sessionHandle, sourceDdl, -1, configuration)); + awaitOperationTermination( + service, + sessionHandle, + service.executeStatement(sessionHandle, sinkDdl, -1, configuration)); + + OperationHandle handle = + service.executeStatement(sessionHandle, compileSql, -1, configuration); + awaitOperationTermination(service, sessionHandle, handle); + + assertThat(service.getOperationResultSchema(sessionHandle, handle)) + .isEqualTo(ResolvedSchema.of(Column.physical("result", DataTypes.STRING()))); + + List rows = fetchAllResults(service, sessionHandle, handle); + assertThat(rows).hasSize(1); + + String planJson = rows.get(0).getString(0).toString(); + assertThat(planJson).contains("\"flinkVersion\""); + assertThat(planJson).contains("stream-exec-table-source-scan"); + assertThat(planJson).contains("stream-exec-sink"); + } + @Test void testShowJobsOperation(@InjectClusterClient RestClusterClient restClusterClient) throws Exception { diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index cf01aee2f7e9a..7c9863b06cc9b 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -3188,16 +3188,19 @@ SqlNode SqlCompileAndExecutePlan() : SqlNode SqlCompilePlan() : { SqlParserPos startPos; - SqlNode filePath; + SqlNode filePath = null; boolean ifNotExists; SqlNode operand; } { { startPos = getPos(); } - filePath = StringLiteral() - - ifNotExists = IfNotExistsOpt() + ( + filePath = StringLiteral() + ifNotExists = IfNotExistsOpt() + | + { ifNotExists = false; } + ) diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCompilePlan.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCompilePlan.java index 63229ebb55a9e..15bc5828d1fa0 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCompilePlan.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCompilePlan.java @@ -32,13 +32,15 @@ import org.apache.calcite.sql.parser.SqlParserPos; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Collections; import java.util.List; /** - * AST node for {@code COMPILE PLAN 'planfile' [IF NOT EXISTS] FOR [DML]}. DML can be either a - * {@link RichSqlInsert} or a {@link SqlStatementSet}. + * AST node for {@code COMPILE PLAN ['planfile' [IF NOT EXISTS]] FOR [DML]}. DML can be either a + * {@link RichSqlInsert} or a {@link SqlStatementSet}. When the plan file path is omitted, the + * compiled plan is returned inline as a result set instead of being written to disk. */ @Internal public class SqlCompilePlan extends SqlCall { @@ -46,18 +48,24 @@ public class SqlCompilePlan extends SqlCall { public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("COMPILE PLAN", SqlKind.OTHER); - private final SqlNode planFile; + @Nullable private final SqlNode planFile; private final boolean ifNotExists; private SqlNode operand; public SqlCompilePlan( - SqlParserPos pos, SqlNode planFile, boolean ifNotExists, SqlNode operand) { + SqlParserPos pos, @Nullable SqlNode planFile, boolean ifNotExists, SqlNode operand) { super(pos); + if (planFile == null && ifNotExists) { + throw new IllegalArgumentException( + "IF NOT EXISTS is only valid when a plan file path is specified."); + } this.planFile = planFile; this.ifNotExists = ifNotExists; this.operand = checkOperand(operand); } + /** Returns the plan file path, or null if the plan should be returned inline. */ + @Nullable public String getPlanFile() { return SqlParseUtils.extractString(planFile); } @@ -82,11 +90,13 @@ public List getOperandList() { public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("COMPILE"); writer.keyword("PLAN"); - planFile.unparse(writer, leftPrec, rightPrec); - if (isIfNotExists()) { - writer.keyword("IF"); - writer.keyword("NOT"); - writer.keyword("EXISTS"); + if (planFile != null) { + planFile.unparse(writer, leftPrec, rightPrec); + if (isIfNotExists()) { + writer.keyword("IF"); + writer.keyword("NOT"); + writer.keyword("EXISTS"); + } } writer.keyword("FOR"); operand.unparse(writer, leftPrec, rightPrec); diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index c147caec32f8e..ee82317345f16 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -2925,6 +2925,27 @@ void testCompilePlan() { + "FROM `T3`\n" + ";\n" + "END"); + + // Inline (no file path) — plan returned as result set + sql("compile plan for insert into t1 select * from t2") + .ok("COMPILE PLAN FOR INSERT INTO `T1`\n" + "SELECT *\n" + "FROM `T2`"); + sql("compile plan for statement set " + + "begin insert into t1 select * from t2; insert into t2 select * from t3; end") + .ok( + "COMPILE PLAN FOR STATEMENT SET BEGIN\n" + + "INSERT INTO `T1`\n" + + "SELECT *\n" + + "FROM `T2`\n" + + ";\n" + + "INSERT INTO `T2`\n" + + "SELECT *\n" + + "FROM `T3`\n" + + ";\n" + + "END"); + + // IF NOT EXISTS is only valid when a file path is specified. + sql("compile plan ^if^ not exists for insert into t1 select * from t2") + .fails("(?s).*Encountered \"if\" at line 1, column 14.\n.*"); } @Test diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 4743663e8a402..74deb779bcadd 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -1022,17 +1022,7 @@ private CompiledPlan compilePlanAndWrite( } } - CompiledPlan compiledPlan; - if (operation instanceof StatementSetOperation) { - compiledPlan = compilePlan(((StatementSetOperation) operation).getOperations()); - } else if (operation instanceof ModifyOperation) { - compiledPlan = compilePlan(Collections.singletonList((ModifyOperation) operation)); - } else { - throw new TableException( - "Unsupported operation to compile: " - + operation.getClass() - + ". This is a bug, please file an issue."); - } + CompiledPlan compiledPlan = compileOperationToPlan(operation); resourceManager.syncFileResource( planResource, path -> compiledPlan.writeToFile(path, false)); return compiledPlan; @@ -1048,6 +1038,20 @@ public CompiledPlan compilePlan(List operations) { return new CompiledPlanImpl(this, planner.compilePlan(operations)); } + /** Compile an operation (ModifyOperation or StatementSetOperation) into a plan. */ + private CompiledPlan compileOperationToPlan(Operation operation) { + if (operation instanceof StatementSetOperation) { + return compilePlan(((StatementSetOperation) operation).getOperations()); + } else if (operation instanceof ModifyOperation) { + return compilePlan(Collections.singletonList((ModifyOperation) operation)); + } else { + throw new TableException( + "Unsupported operation to compile: " + + operation.getClass() + + ". This is a bug, please file an issue."); + } + } + @Override public TableResultInternal executeInternal(List operations) { List mapOperations = new ArrayList<>(); @@ -1360,11 +1364,19 @@ public TableResultInternal executeInternal(Operation operation) { } } else if (operation instanceof CompilePlanOperation) { CompilePlanOperation compilePlanOperation = (CompilePlanOperation) operation; - compilePlanAndWrite( - compilePlanOperation.getFilePath(), - compilePlanOperation.isIfNotExists(), - compilePlanOperation.getOperation()); - return TableResultImpl.TABLE_RESULT_OK; + if (compilePlanOperation.getFilePath() != null) { + compilePlanAndWrite( + compilePlanOperation.getFilePath(), + compilePlanOperation.isIfNotExists(), + compilePlanOperation.getOperation()); + return TableResultImpl.TABLE_RESULT_OK; + } else { + // No file path — return the compiled plan inline as a result set. + CompiledPlan compiledPlan = + compileOperationToPlan(compilePlanOperation.getOperation()); + return TableResultUtils.buildStringArrayResult( + "result", new String[] {compiledPlan.asJsonString()}); + } } else if (operation instanceof CompileAndExecutePlanOperation) { CompileAndExecutePlanOperation compileAndExecutePlanOperation = (CompileAndExecutePlanOperation) operation; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CompilePlanOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CompilePlanOperation.java index 6244dfaa72ee9..79afb9fb61584 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CompilePlanOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CompilePlanOperation.java @@ -24,23 +24,34 @@ import org.apache.flink.table.operations.StatementSetOperation; import org.apache.flink.util.Preconditions; -/** Operation to describe an {@code COMPILE PLAN} statement. */ +import javax.annotation.Nullable; + +/** + * Operation to describe a {@code COMPILE PLAN} statement. When {@code filePath} is null, the + * compiled plan should be returned inline as a single-row, single-column {@code STRING} result set + * instead of being written to disk. + */ @Internal public class CompilePlanOperation implements Operation { - private final String filePath; + @Nullable private final String filePath; private final boolean ifNotExists; private final Operation operation; - public CompilePlanOperation(String filePath, boolean ifNotExists, Operation operation) { + public CompilePlanOperation( + @Nullable String filePath, boolean ifNotExists, Operation operation) { Preconditions.checkArgument( operation instanceof StatementSetOperation || operation instanceof ModifyOperation, "child operation of CompileOperation must be either a ModifyOperation or a StatementSetOperation"); + Preconditions.checkArgument( + filePath != null || !ifNotExists, + "IF NOT EXISTS is only valid when a file path is specified"); this.filePath = filePath; this.ifNotExists = ifNotExists; this.operation = operation; } + @Nullable public String getFilePath() { return filePath; } @@ -55,6 +66,9 @@ public Operation getOperation() { @Override public String asSummaryString() { + if (filePath == null) { + return String.format("COMPILE PLAN FOR %s", operation.asSummaryString()); + } return String.format( ifNotExists ? "COMPILE PLAN '%s' IF NOT EXISTS FOR %s" : "COMPILE PLAN '%s' FOR %s", filePath, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java index 0d5c13d48a538..c4d8b8ab87229 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java @@ -22,10 +22,14 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.planner.utils.JsonPlanTestBase; import org.apache.flink.table.planner.utils.JsonTestUtils; import org.apache.flink.table.planner.utils.TableTestUtil; import org.apache.flink.testutils.junit.utils.TempDirUtils; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.BeforeEach; @@ -219,6 +223,54 @@ void testCompilePlan() throws Exception { assertResult(DATA, sinkPath); } + @Test + void testCompilePlanInline() throws Exception { + File sinkPath = createSourceSinkTables(); + + TableResult tableResult = + tableEnv.executeSql("COMPILE PLAN FOR INSERT INTO sink SELECT * FROM src"); + + assertThat(tableResult.getResultKind()).isEqualTo(ResultKind.SUCCESS_WITH_CONTENT); + assertThat(tableResult.getResolvedSchema()) + .isEqualTo(ResolvedSchema.of(Column.physical("result", DataTypes.STRING()))); + + List rows = CollectionUtil.iteratorToList(tableResult.collect()); + assertThat(rows).hasSize(1); + String planJson = rows.get(0).getFieldAs(0); + assertThat(planJson).contains("\"flinkVersion\""); + assertThat(planJson).contains("stream-exec-table-source-scan"); + assertThat(planJson).contains("stream-exec-sink"); + + // The returned JSON must be a valid, executable plan. + tableEnv.loadPlan(PlanReference.fromJsonString(planJson)).execute().await(); + assertResult(DATA, sinkPath); + } + + @Test + void testCompilePlanInlineWithStatementSet() throws Exception { + createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION); + createTestCsvSinkTable("sinkA", COLUMNS_DEFINITION); + createTestCsvSinkTable("sinkB", COLUMNS_DEFINITION); + + TableResult tableResult = + tableEnv.executeSql( + "COMPILE PLAN FOR STATEMENT SET BEGIN " + + "INSERT INTO sinkA SELECT * FROM src; " + + "INSERT INTO sinkB SELECT * FROM src; " + + "END"); + + assertThat(tableResult.getResultKind()).isEqualTo(ResultKind.SUCCESS_WITH_CONTENT); + assertThat(tableResult.getResolvedSchema()) + .isEqualTo(ResolvedSchema.of(Column.physical("result", DataTypes.STRING()))); + + List rows = CollectionUtil.iteratorToList(tableResult.collect()); + assertThat(rows).hasSize(1); + String planJson = rows.get(0).getFieldAs(0); + assertThat(planJson).contains("\"flinkVersion\""); + assertThat(planJson).contains("sinkA"); + assertThat(planJson).contains("sinkB"); + } + @Test void testCompilePlanWithStatementSet() throws Exception { Path planPath =