Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions docs/content.zh/docs/concepts/sql-table-concepts/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,20 +260,24 @@ Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO enriched_ord
- SQL 语法

```sql
COMPILE PLAN [IF NOT EXISTS] <plan_file_path> FOR <insert_statement>|<statement_set>;

-- 将编译后的计划写入文件:
COMPILE PLAN '<plan_file_path>' [IF NOT EXISTS] FOR <insert_statement>|<statement_set>;

-- 或将编译后的计划作为一行一列的 STRING 结果集内联返回:
COMPILE PLAN FOR <insert_statement>|<statement_set>;

statement_set:
EXECUTE STATEMENT SET
BEGIN
insert_statement;
...
insert_statement;
END;

insert_statement:
<insert_from_select>|<insert_from_values>
```
该语句会在指定位置 `/path/to/plan.json` 生成一个 JSON 文件。
文件形式会在 `<plan_file_path>` 生成 JSON 文件。`IF NOT EXISTS` 仅在文件形式下有效,用于在文件已存在时跳过编译。内联形式返回相同的 JSON 作为结果集,适用于与执行器不共享文件系统的 SQL Gateway / SQL Client / JDBC 客户端

{{< hint info >}}
`COMPILE PLAN` 语句支持写入 `hdfs://` 或 `s3://` 等 Flink 支持的[文件系统]({{< ref "docs/deployment/filesystems/overview" >}})。
Expand Down
12 changes: 8 additions & 4 deletions docs/content/docs/concepts/sql-table-concepts/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,20 +278,24 @@ Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO enriched_ord
- SQL Syntax

```sql
COMPILE PLAN [IF NOT EXISTS] <plan_file_path> FOR <insert_statement>|<statement_set>;
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in the current docs. [IF NOT EXISTS] should be after the file path.


-- Write the compiled plan to a file:
COMPILE PLAN '<plan_file_path>' [IF NOT EXISTS] FOR <insert_statement>|<statement_set>;

-- Or return the compiled plan inline as a single-row, single-column STRING result set:
COMPILE PLAN FOR <insert_statement>|<statement_set>;

statement_set:
EXECUTE STATEMENT SET
BEGIN
insert_statement;
...
insert_statement;
END;

insert_statement:
<insert_from_select>|<insert_from_values>
```
This will generate a JSON file at `/path/to/plan.json`.
The file form writes a JSON file at `<plan_file_path>`. `IF NOT EXISTS` skips compilation when the file already exists and is only valid in the file form. The inline form returns the same JSON as a result set, which is convenient for SQL Gateway / SQL Client / JDBC clients that do not share a filesystem with the executor.

{{< hint info >}}
`COMPILE PLAN` statement supports writing the plan to a remote [filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) scheme like `hdfs://` or `s3://`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,40 @@ void testGetOperationSchemaUntilOperationIsReady() throws Exception {
task -> assertThat(task.get()).isEqualTo(getDefaultResultSet().getResultSchema()));
}

@Test
void testCompilePlanInline() throws Exception {
SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());

String sourceDdl = "CREATE TABLE src (a INT, b STRING) WITH ('connector' = 'datagen')";
String sinkDdl = "CREATE TABLE snk (a INT, b STRING) WITH ('connector' = 'blackhole')";
String compileSql = "COMPILE PLAN FOR INSERT INTO snk SELECT * FROM src";

awaitOperationTermination(
service,
sessionHandle,
service.executeStatement(sessionHandle, sourceDdl, -1, configuration));
awaitOperationTermination(
service,
sessionHandle,
service.executeStatement(sessionHandle, sinkDdl, -1, configuration));

OperationHandle handle =
service.executeStatement(sessionHandle, compileSql, -1, configuration);
awaitOperationTermination(service, sessionHandle, handle);

assertThat(service.getOperationResultSchema(sessionHandle, handle))
.isEqualTo(ResolvedSchema.of(Column.physical("result", DataTypes.STRING())));

List<RowData> rows = fetchAllResults(service, sessionHandle, handle);
assertThat(rows).hasSize(1);

String planJson = rows.get(0).getString(0).toString();
assertThat(planJson).contains("\"flinkVersion\"");
assertThat(planJson).contains("stream-exec-table-source-scan");
assertThat(planJson).contains("stream-exec-sink");
}

@Test
void testShowJobsOperation(@InjectClusterClient RestClusterClient<?> restClusterClient)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3188,16 +3188,19 @@ SqlNode SqlCompileAndExecutePlan() :
SqlNode SqlCompilePlan() :
{
SqlParserPos startPos;
SqlNode filePath;
SqlNode filePath = null;
boolean ifNotExists;
SqlNode operand;
}
{
<COMPILE> <PLAN> { startPos = getPos(); }

filePath = StringLiteral()

ifNotExists = IfNotExistsOpt()
(
filePath = StringLiteral()
ifNotExists = IfNotExistsOpt()
|
{ ifNotExists = false; }
)

<FOR>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,40 @@
import org.apache.calcite.sql.parser.SqlParserPos;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;

/**
* AST node for {@code COMPILE PLAN 'planfile' [IF NOT EXISTS] FOR [DML]}. DML can be either a
* {@link RichSqlInsert} or a {@link SqlStatementSet}.
* AST node for {@code COMPILE PLAN ['planfile' [IF NOT EXISTS]] FOR [DML]}. DML can be either a
* {@link RichSqlInsert} or a {@link SqlStatementSet}. When the plan file path is omitted, the
* compiled plan is returned inline as a result set instead of being written to disk.
*/
@Internal
public class SqlCompilePlan extends SqlCall {

public static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("COMPILE PLAN", SqlKind.OTHER);

private final SqlNode planFile;
@Nullable private final SqlNode planFile;
private final boolean ifNotExists;
private SqlNode operand;

public SqlCompilePlan(
SqlParserPos pos, SqlNode planFile, boolean ifNotExists, SqlNode operand) {
SqlParserPos pos, @Nullable SqlNode planFile, boolean ifNotExists, SqlNode operand) {
super(pos);
if (planFile == null && ifNotExists) {
throw new IllegalArgumentException(
"IF NOT EXISTS is only valid when a plan file path is specified.");
}
this.planFile = planFile;
this.ifNotExists = ifNotExists;
this.operand = checkOperand(operand);
}

/** Returns the plan file path, or null if the plan should be returned inline. */
@Nullable
public String getPlanFile() {
return SqlParseUtils.extractString(planFile);
}
Expand All @@ -82,11 +90,13 @@ public List<SqlNode> getOperandList() {
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("COMPILE");
writer.keyword("PLAN");
planFile.unparse(writer, leftPrec, rightPrec);
if (isIfNotExists()) {
writer.keyword("IF");
writer.keyword("NOT");
writer.keyword("EXISTS");
if (planFile != null) {
planFile.unparse(writer, leftPrec, rightPrec);
if (isIfNotExists()) {
writer.keyword("IF");
writer.keyword("NOT");
writer.keyword("EXISTS");
}
}
writer.keyword("FOR");
operand.unparse(writer, leftPrec, rightPrec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2925,6 +2925,27 @@ void testCompilePlan() {
+ "FROM `T3`\n"
+ ";\n"
+ "END");

// Inline (no file path) — plan returned as result set
sql("compile plan for insert into t1 select * from t2")
.ok("COMPILE PLAN FOR INSERT INTO `T1`\n" + "SELECT *\n" + "FROM `T2`");
sql("compile plan for statement set "
+ "begin insert into t1 select * from t2; insert into t2 select * from t3; end")
.ok(
"COMPILE PLAN FOR STATEMENT SET BEGIN\n"
+ "INSERT INTO `T1`\n"
+ "SELECT *\n"
+ "FROM `T2`\n"
+ ";\n"
+ "INSERT INTO `T2`\n"
+ "SELECT *\n"
+ "FROM `T3`\n"
+ ";\n"
+ "END");

// IF NOT EXISTS is only valid when a file path is specified.
sql("compile plan ^if^ not exists for insert into t1 select * from t2")
.fails("(?s).*Encountered \"if\" at line 1, column 14.\n.*");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1022,17 +1022,7 @@ private CompiledPlan compilePlanAndWrite(
}
}

CompiledPlan compiledPlan;
if (operation instanceof StatementSetOperation) {
compiledPlan = compilePlan(((StatementSetOperation) operation).getOperations());
} else if (operation instanceof ModifyOperation) {
compiledPlan = compilePlan(Collections.singletonList((ModifyOperation) operation));
} else {
throw new TableException(
"Unsupported operation to compile: "
+ operation.getClass()
+ ". This is a bug, please file an issue.");
}
CompiledPlan compiledPlan = compileOperationToPlan(operation);
resourceManager.syncFileResource(
planResource, path -> compiledPlan.writeToFile(path, false));
return compiledPlan;
Expand All @@ -1048,6 +1038,20 @@ public CompiledPlan compilePlan(List<ModifyOperation> operations) {
return new CompiledPlanImpl(this, planner.compilePlan(operations));
}

/** Compile an operation (ModifyOperation or StatementSetOperation) into a plan. */
private CompiledPlan compileOperationToPlan(Operation operation) {
if (operation instanceof StatementSetOperation) {
return compilePlan(((StatementSetOperation) operation).getOperations());
} else if (operation instanceof ModifyOperation) {
return compilePlan(Collections.singletonList((ModifyOperation) operation));
} else {
throw new TableException(
"Unsupported operation to compile: "
+ operation.getClass()
+ ". This is a bug, please file an issue.");
}
}

@Override
public TableResultInternal executeInternal(List<ModifyOperation> operations) {
List<ModifyOperation> mapOperations = new ArrayList<>();
Expand Down Expand Up @@ -1360,11 +1364,19 @@ public TableResultInternal executeInternal(Operation operation) {
}
} else if (operation instanceof CompilePlanOperation) {
CompilePlanOperation compilePlanOperation = (CompilePlanOperation) operation;
compilePlanAndWrite(
compilePlanOperation.getFilePath(),
compilePlanOperation.isIfNotExists(),
compilePlanOperation.getOperation());
return TableResultImpl.TABLE_RESULT_OK;
if (compilePlanOperation.getFilePath() != null) {
compilePlanAndWrite(
compilePlanOperation.getFilePath(),
compilePlanOperation.isIfNotExists(),
compilePlanOperation.getOperation());
return TableResultImpl.TABLE_RESULT_OK;
} else {
// No file path — return the compiled plan inline as a result set.
CompiledPlan compiledPlan =
compileOperationToPlan(compilePlanOperation.getOperation());
return TableResultUtils.buildStringArrayResult(
"result", new String[] {compiledPlan.asJsonString()});
}
} else if (operation instanceof CompileAndExecutePlanOperation) {
CompileAndExecutePlanOperation compileAndExecutePlanOperation =
(CompileAndExecutePlanOperation) operation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,34 @@
import org.apache.flink.table.operations.StatementSetOperation;
import org.apache.flink.util.Preconditions;

/** Operation to describe an {@code COMPILE PLAN} statement. */
import javax.annotation.Nullable;

/**
* Operation to describe a {@code COMPILE PLAN} statement. When {@code filePath} is null, the
* compiled plan should be returned inline as a single-row, single-column {@code STRING} result set
* instead of being written to disk.
*/
@Internal
public class CompilePlanOperation implements Operation {

private final String filePath;
@Nullable private final String filePath;
private final boolean ifNotExists;
private final Operation operation;

public CompilePlanOperation(String filePath, boolean ifNotExists, Operation operation) {
public CompilePlanOperation(
@Nullable String filePath, boolean ifNotExists, Operation operation) {
Preconditions.checkArgument(
operation instanceof StatementSetOperation || operation instanceof ModifyOperation,
"child operation of CompileOperation must be either a ModifyOperation or a StatementSetOperation");
Preconditions.checkArgument(
filePath != null || !ifNotExists,
"IF NOT EXISTS is only valid when a file path is specified");
this.filePath = filePath;
this.ifNotExists = ifNotExists;
this.operation = operation;
}

@Nullable
public String getFilePath() {
return filePath;
}
Expand All @@ -55,6 +66,9 @@ public Operation getOperation() {

@Override
public String asSummaryString() {
if (filePath == null) {
return String.format("COMPILE PLAN FOR %s", operation.asSummaryString());
}
return String.format(
ifNotExists ? "COMPILE PLAN '%s' IF NOT EXISTS FOR %s" : "COMPILE PLAN '%s' FOR %s",
filePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.table.planner.utils.JsonTestUtils;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;

import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -219,6 +223,54 @@ void testCompilePlan() throws Exception {
assertResult(DATA, sinkPath);
}

@Test
void testCompilePlanInline() throws Exception {
File sinkPath = createSourceSinkTables();

TableResult tableResult =
tableEnv.executeSql("COMPILE PLAN FOR INSERT INTO sink SELECT * FROM src");

assertThat(tableResult.getResultKind()).isEqualTo(ResultKind.SUCCESS_WITH_CONTENT);
assertThat(tableResult.getResolvedSchema())
.isEqualTo(ResolvedSchema.of(Column.physical("result", DataTypes.STRING())));

List<Row> rows = CollectionUtil.iteratorToList(tableResult.collect());
assertThat(rows).hasSize(1);
String planJson = rows.get(0).getFieldAs(0);
assertThat(planJson).contains("\"flinkVersion\"");
assertThat(planJson).contains("stream-exec-table-source-scan");
assertThat(planJson).contains("stream-exec-sink");

// The returned JSON must be a valid, executable plan.
tableEnv.loadPlan(PlanReference.fromJsonString(planJson)).execute().await();
assertResult(DATA, sinkPath);
}

@Test
void testCompilePlanInlineWithStatementSet() throws Exception {
createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
createTestCsvSinkTable("sinkA", COLUMNS_DEFINITION);
createTestCsvSinkTable("sinkB", COLUMNS_DEFINITION);

TableResult tableResult =
tableEnv.executeSql(
"COMPILE PLAN FOR STATEMENT SET BEGIN "
+ "INSERT INTO sinkA SELECT * FROM src; "
+ "INSERT INTO sinkB SELECT * FROM src; "
+ "END");

assertThat(tableResult.getResultKind()).isEqualTo(ResultKind.SUCCESS_WITH_CONTENT);
assertThat(tableResult.getResolvedSchema())
.isEqualTo(ResolvedSchema.of(Column.physical("result", DataTypes.STRING())));

List<Row> rows = CollectionUtil.iteratorToList(tableResult.collect());
assertThat(rows).hasSize(1);
String planJson = rows.get(0).getFieldAs(0);
assertThat(planJson).contains("\"flinkVersion\"");
assertThat(planJson).contains("sinkA");
assertThat(planJson).contains("sinkB");
}

@Test
void testCompilePlanWithStatementSet() throws Exception {
Path planPath =
Expand Down