diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java index 10341326747e2..da19e6d8578e7 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java @@ -25,9 +25,13 @@ import org.apache.flink.table.client.cli.utils.TestSqlStatement; import org.apache.flink.table.client.gateway.Executor; import org.apache.flink.table.client.gateway.SingleSessionManager; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension; import org.apache.flink.table.gateway.service.context.DefaultContext; import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; +import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory; import org.apache.flink.table.planner.utils.TableTestUtil; import org.apache.flink.test.junit5.InjectClusterClientConfiguration; import org.apache.flink.test.junit5.MiniClusterExtension; @@ -165,6 +169,7 @@ static void setup(@InjectClusterClientConfiguration Configuration configuration) replaceVars.put("$VAR_PIPELINE_JARS_URL", udfDependency.toString()); replaceVars.put("$VAR_REST_PORT", configuration.get(PORT).toString()); replaceVars.put("$VAR_JOBMANAGER_RPC_ADDRESS", configuration.get(ADDRESS)); + replaceVars.put("$VAR_DELETE_TABLE_DATA_ID", prepareDataForDeleteStatement()); } @BeforeEach @@ -290,6 +295,14 @@ private boolean containsTag(List contents, String tag) { } } + private static String prepareDataForDeleteStatement() { + List values = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + values.add(GenericRowData.of(i, StringData.fromString("b_" + i), i * 2.0)); + } + return TestUpdateDeleteTableFactory.registerRowData(values); + } + private static String getInputFromPath(String sqlPath) throws IOException { URL url = CliClientITCase.class.getResource("/" + sqlPath); String in = IOUtils.toString(url, StandardCharsets.UTF_8); diff --git a/flink-table/flink-sql-client/src/test/resources/sql/delete.q b/flink-table/flink-sql-client/src/test/resources/sql/delete.q new file mode 100644 index 0000000000000..1604f27305dc2 --- /dev/null +++ b/flink-table/flink-sql-client/src/test/resources/sql/delete.q @@ -0,0 +1,80 @@ +# delete.q - test delete statement +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to you under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# first set batch mode +SET 'execution.runtime-mode' = 'batch'; +[INFO] Execute statement succeed. +!info + +SET 'sql-client.execution.result-mode' = 'tableau'; +[INFO] Execute statement succeed. +!info + +SET 'table.dml-sync' = 'true'; +[INFO] Execute statement succeed. +!info + +# create a table first +CREATE TABLE t (a int, b string, c double) +WITH ( + 'connector' = 'test-update-delete', + 'data-id' = '$VAR_DELETE_TABLE_DATA_ID', + 'mix-delete' = 'true' +); +[INFO] Execute statement succeed. +!info + +# query the table first +SELECT * FROM t; ++---+-----+-----+ +| a | b | c | ++---+-----+-----+ +| 0 | b_0 | 0.0 | +| 1 | b_1 | 2.0 | +| 2 | b_2 | 4.0 | +| 3 | b_3 | 6.0 | +| 4 | b_4 | 8.0 | ++---+-----+-----+ +5 rows in set +!ok + +# delete the table with condition containing subquery which can't be push down, so that it'll submit a job; +DELETE FROM t WHERE a >= (SELECT COUNT(1) FROM t WHERE c > 2); +[INFO] Complete execution of the SQL update statement. +!info + +# query the table +SELECT * FROM t; ++---+-----+-----+ +| a | b | c | ++---+-----+-----+ +| 0 | b_0 | 0.0 | +| 1 | b_1 | 2.0 | +| 2 | b_2 | 4.0 | ++---+-----+-----+ +3 rows in set +!ok + +# delete the table with filter push down +DELETE FROM t; ++---------------+ +| rows affected | ++---------------+ +| 3 | ++---------------+ +1 row in set +!ok diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java index 0017db2fdf7f6..6ba1b54450096 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java @@ -67,6 +67,7 @@ import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.BeginStatementSetOperation; +import org.apache.flink.table.operations.DeleteFromFilterOperation; import org.apache.flink.table.operations.EndStatementSetOperation; import org.apache.flink.table.operations.LoadModuleOperation; import org.apache.flink.table.operations.ModifyOperation; @@ -506,6 +507,12 @@ private ResultFetcher callModifyOperations( OperationHandle handle, List modifyOperations) { TableResultInternal result = tableEnv.executeInternal(modifyOperations); + // DeleteFromFilterOperation doesn't have a JobClient + if (modifyOperations.size() == 1 + && modifyOperations.get(0) instanceof DeleteFromFilterOperation) { + return ResultFetcher.fromTableResult(handle, result, false); + } + JobID jobID = result.getJobClient() .orElseThrow( diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index e7c5ea2789bdd..7c9f6f42dab1c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -824,9 +824,9 @@ private TableResultInternal executeInternal( deleteFromFilterOperation.getSupportsDeletePushDownSink().executeDeletion(); if (rows.isPresent()) { return TableResultImpl.builder() - .resultKind(ResultKind.SUCCESS) - .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING()))) - .data(Arrays.asList(Row.of(String.valueOf(rows.get())), Row.of("OK"))) + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .schema(ResolvedSchema.of(Column.physical("rows affected", DataTypes.BIGINT()))) + .data(Collections.singletonList(Row.of(rows.get()))) .build(); } else { return TableResultImpl.TABLE_RESULT_OK; @@ -1183,6 +1183,6 @@ private boolean isRowLevelModification(Operation operation) { SinkModifyOperation sinkModifyOperation = (SinkModifyOperation) operation; return sinkModifyOperation.isDelete() || sinkModifyOperation.isUpdate(); } - return true; + return false; } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java index 1a7005f3b427a..cd26a7b3fd477 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java @@ -26,11 +26,10 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; - import javax.annotation.Nonnull; import javax.annotation.Nullable; diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java index ad706fc9466bf..b918b7af3a0bd 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java @@ -74,7 +74,7 @@ public void testDeletePushDown() throws Exception { dataId)); // it only contains equal expression, should be pushed down List rows = toRows(tEnv().executeSql("DELETE FROM t where a = 1")); - assertThat(rows.toString()).isEqualTo("[+I[1], +I[OK]]"); + assertThat(rows.toString()).isEqualTo("[+I[1]]"); rows = toRows(tEnv().executeSql("SELECT * FROM t")); assertThat(rows.toString()) .isEqualTo("[+I[0, b_0, 0.0], +I[2, b_2, 4.0], +I[3, b_3, 6.0], +I[4, b_4, 8.0]]"); @@ -130,7 +130,7 @@ public void testMixDelete() throws Exception { // should fall back to delete push down rows = toRows(tEnv().executeSql("DELETE FROM t")); - assertThat(rows.toString()).isEqualTo("[+I[3], +I[OK]]"); + assertThat(rows.toString()).isEqualTo("[+I[3]]"); rows = toRows(tEnv().executeSql("SELECT * FROM t")); assertThat(rows).isEmpty(); }