diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java index ae3c815789fd38..aa2cd3a9331f35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.StorageBackend.StorageType; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.jdbc.client.JdbcClient; import org.apache.doris.job.cdc.DataSourceConfigKeys; @@ -53,14 +54,18 @@ public class CdcStreamTableValuedFunction extends ExternalFileTableValuedFunctio public static final String JOB_ID_KEY = "job.id"; public static final String TASK_ID_KEY = "task.id"; public static final String META_KEY = "meta"; + public static final String INCLUDE_DELETE_SIGN = "include_delete_sign"; + private final boolean includeDeleteSign; public CdcStreamTableValuedFunction(Map properties) throws AnalysisException { validate(properties); + includeDeleteSign = Boolean.parseBoolean(properties.getOrDefault(INCLUDE_DELETE_SIGN, "false")); processProps(properties); } private void processProps(Map properties) throws AnalysisException { Map copyProps = new HashMap<>(properties); + copyProps.remove(INCLUDE_DELETE_SIGN); copyProps.put("format", "json"); // Standalone TVF: random jobId. TVF-in-job: job.id injected by rewriteTvfParams. @@ -157,6 +162,7 @@ private void validate(Map properties) throws AnalysisException { validatePositiveIntIfPresent(properties, DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE); validatePositiveIntIfPresent(properties, DataSourceConfigKeys.SNAPSHOT_PARALLELISM); validateBooleanIfPresent(properties, DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL); + validateBooleanIfPresent(properties, INCLUDE_DELETE_SIGN); // TVF entrypoint shares server_id checks with the from-to path's validateSource. try { DataSourceConfigValidator.validateServerIdConfig(properties); @@ -214,7 +220,11 @@ public List getTableColumns() throws AnalysisException { if (!jdbcClient.isTableExist(database, table)) { throw new AnalysisException("Table does not exist: " + table); } - return jdbcClient.getColumnsFromJdbc(database, table); + List columns = new ArrayList<>(jdbcClient.getColumnsFromJdbc(database, table)); + if (includeDeleteSign) { + columns.add(new Column(Column.DELETE_SIGN, PrimitiveType.TINYINT, false)); + } + return columns; } finally { jdbcClient.closeClient(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunctionTest.java b/fe/fe-core/src/test/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunctionTest.java new file mode 100644 index 00000000000000..1be26fa69daa7f --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunctionTest.java @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.jdbc.client.JdbcClient; +import org.apache.doris.job.cdc.DataSourceConfigKeys; +import org.apache.doris.job.common.DataSourceType; +import org.apache.doris.job.util.StreamingJobUtils; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class CdcStreamTableValuedFunctionTest { + + @Test + public void testDeleteSignIsExcludedByDefault() throws Exception { + List columns = getTableColumns(baseProperties()); + + Assert.assertEquals(1, columns.size()); + Assert.assertEquals("id", columns.get(0).getName()); + } + + @Test + public void testDeleteSignIsIncludedWhenEnabled() throws Exception { + Map properties = baseProperties(); + properties.put(CdcStreamTableValuedFunction.INCLUDE_DELETE_SIGN, "true"); + + List columns = getTableColumns(properties); + + Assert.assertEquals(2, columns.size()); + Column deleteSign = columns.get(1); + Assert.assertEquals(Column.DELETE_SIGN, deleteSign.getName()); + Assert.assertEquals(PrimitiveType.TINYINT, deleteSign.getType().getPrimitiveType()); + Assert.assertFalse(deleteSign.isAllowNull()); + } + + @Test + public void testInvalidIncludeDeleteSignIsRejected() { + Map properties = baseProperties(); + properties.put(CdcStreamTableValuedFunction.INCLUDE_DELETE_SIGN, "invalid"); + + AnalysisException exception = Assert.assertThrows(AnalysisException.class, + () -> new CdcStreamTableValuedFunction(properties)); + + Assert.assertTrue(exception.getMessage().contains("include_delete_sign")); + } + + private List getTableColumns(Map properties) throws Exception { + JdbcClient jdbcClient = Mockito.mock(JdbcClient.class); + List sourceColumns = new ArrayList<>(); + sourceColumns.add(new Column("id", PrimitiveType.INT)); + Mockito.when(jdbcClient.isTableExist("test_db", "test_table")).thenReturn(true); + Mockito.when(jdbcClient.getColumnsFromJdbc("test_db", "test_table")).thenReturn(sourceColumns); + + try (MockedStatic utils = Mockito.mockStatic(StreamingJobUtils.class)) { + utils.when(() -> StreamingJobUtils.getJdbcClient( + Mockito.eq(DataSourceType.MYSQL), Mockito.anyMap())) + .thenReturn(jdbcClient); + utils.when(() -> StreamingJobUtils.getRemoteDbName( + Mockito.eq(DataSourceType.MYSQL), Mockito.anyMap())) + .thenReturn("test_db"); + CdcStreamTableValuedFunction function = new CdcStreamTableValuedFunction(properties); + return function.getTableColumns(); + } + } + + private Map baseProperties() { + Map properties = new HashMap<>(); + properties.put(DataSourceConfigKeys.TYPE, "mysql"); + properties.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://localhost:3306/test_db"); + properties.put(DataSourceConfigKeys.DATABASE, "test_db"); + properties.put(DataSourceConfigKeys.TABLE, "test_table"); + properties.put(DataSourceConfigKeys.OFFSET, "initial"); + return properties; + } +} diff --git a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_delete_sign.out b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_delete_sign.out new file mode 100644 index 00000000000000..1328d4ef212fd2 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_delete_sign.out @@ -0,0 +1,21 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !snapshot_data -- +A1 1 +B1 2 + +-- !incremental_data -- +A1 1 0 +B1 2 0 +C1 3 0 +D1 4 0 + +-- !after_update -- +A1 1 0 +B1 2 0 +C1 30 0 +D1 4 0 + +-- !after_delete -- +A1 1 0 +B1 2 0 +C1 30 0 diff --git a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_delete_sign.out b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_delete_sign.out new file mode 100644 index 00000000000000..1328d4ef212fd2 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_delete_sign.out @@ -0,0 +1,21 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !snapshot_data -- +A1 1 +B1 2 + +-- !incremental_data -- +A1 1 0 +B1 2 0 +C1 3 0 +D1 4 0 + +-- !after_update -- +A1 1 0 +B1 2 0 +C1 30 0 +D1 4 0 + +-- !after_delete -- +A1 1 0 +B1 2 0 +C1 30 0 diff --git a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_delete_sign.groovy b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_delete_sign.groovy new file mode 100644 index 00000000000000..9eef49daac6b5d --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_delete_sign.groovy @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +/** + * Test CDC stream TVF with delete sign enabled for MySQL primary-key table. + * + * Scenario: + * 1. Snapshot phase (offset=initial): pre-existing rows (A1, B1) are synced. + * 2. Incremental INSERT (C1, D1). + * 3. Incremental UPDATE (C1 age -> 30). + * 4. Incremental DELETE (D1 removed). + */ +suite("test_streaming_job_cdc_stream_mysql_delete_sign", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_job_cdc_stream_mysql_ds_name" + def currentDb = (sql "select database()")[0][0] + def dorisTable = "test_streaming_job_cdc_stream_mysql_ds_tbl" + def mysqlDb = "test_cdc_db" + def mysqlTable = "test_streaming_job_cdc_stream_mysql_ds_src" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${dorisTable} force""" + + sql """ + CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} ( + `name` varchar(200) NULL, + `age` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`name`) + DISTRIBUTED BY HASH(`name`) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true" + ) + """ + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + // prepare source table with primary key and pre-existing snapshot data + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlTable}""" + sql """CREATE TABLE ${mysqlDb}.${mysqlTable} ( + `name` varchar(200) NOT NULL, + `age` int DEFAULT NULL, + PRIMARY KEY (`name`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES ('A1', 1)""" + sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES ('B1', 2)""" + } + + // create streaming job via cdc_stream TVF with delete sign enabled + sql """ + CREATE JOB ${jobName} + ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, age, __DORIS_DELETE_SIGN__) + SELECT name, age, __DORIS_DELETE_SIGN__ FROM cdc_stream( + "type" = "mysql", + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "table" = "${mysqlTable}", + "offset" = "initial", + "snapshot_split_key" = "name", + "include_delete_sign" = "true" + ) + """ + + // wait for at least one snapshot task to succeed + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def cnt = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'""" + log.info("SucceedTaskCount: " + cnt) + cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 1 + }) + } catch (Exception ex) { + log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'""")) + log.info("tasks: " + (sql """select * from tasks("type"="insert") where JobName='${jobName}'""")) + throw ex + } + + // verify snapshot data + qt_snapshot_data """ SELECT name, age FROM ${currentDb}.${dorisTable} ORDER BY name """ + + // insert incremental rows + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES ('C1', 3)""" + sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES ('D1', 4)""" + } + + // wait for binlog tasks to pick up the new rows + try { + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def rows = sql """SELECT count(1) FROM ${currentDb}.${dorisTable} WHERE name IN ('C1', 'D1')""" + log.info("incremental rows: " + rows) + (rows.get(0).get(0) as int) == 2 + }) + } catch (Exception ex) { + log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'""")) + log.info("tasks: " + (sql """select * from tasks("type"="insert") where JobName='${jobName}'""")) + throw ex + } + + qt_incremental_data """ SELECT name, age, __DORIS_DELETE_SIGN__ FROM ${currentDb}.${dorisTable} ORDER BY name """ + + // verify incremental UPDATE + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """UPDATE ${mysqlDb}.${mysqlTable} SET age = 30 WHERE name = 'C1'""" + } + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def rows = sql """SELECT age FROM ${currentDb}.${dorisTable} WHERE name = 'C1'""" + rows.size() == 1 && (rows.get(0).get(0) as int) == 30 + }) + + qt_after_update """ SELECT name, age, __DORIS_DELETE_SIGN__ FROM ${currentDb}.${dorisTable} ORDER BY name """ + + // verify incremental DELETE + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """DELETE FROM ${mysqlDb}.${mysqlTable} WHERE name = 'D1'""" + } + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def rows = sql """SELECT count(1) FROM ${currentDb}.${dorisTable} WHERE name = 'D1'""" + (rows.get(0).get(0) as int) == 0 + }) + + qt_after_delete """ SELECT name, age, __DORIS_DELETE_SIGN__ FROM ${currentDb}.${dorisTable} ORDER BY name """ + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${dorisTable} force""" + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_delete_sign.groovy b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_delete_sign.groovy new file mode 100644 index 00000000000000..97ad3c362da482 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_delete_sign.groovy @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +/** + * Test CDC stream TVF with delete sign enabled for PostgreSQL primary-key table. + * + * Scenario: + * 1. Snapshot phase (offset=initial): pre-existing rows (A1, B1) are synced. + * 2. Incremental INSERT (C1, D1). + * 3. Incremental UPDATE (C1 age -> 30). + * 4. Incremental DELETE (D1 removed). + */ +suite("test_streaming_job_cdc_stream_postgres_delete_sign", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_job_cdc_stream_postgres_ds_name" + def currentDb = (sql "select database()")[0][0] + def dorisTable = "test_streaming_job_cdc_stream_postgres_ds_tbl" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + def pgTable = "test_streaming_job_cdc_stream_postgres_ds_src" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${dorisTable} force""" + + sql """ + CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} ( + `name` varchar(200) NULL, + `age` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`name`) + DISTRIBUTED BY HASH(`name`) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true" + ) + """ + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // prepare source table with primary key and pre-existing snapshot data + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} ( + "name" varchar(200) PRIMARY KEY, + "age" int2 + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) VALUES ('A1', 1)""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) VALUES ('B1', 2)""" + } + + // create streaming job via cdc_stream TVF with delete sign enabled + sql """ + CREATE JOB ${jobName} + ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, age, __DORIS_DELETE_SIGN__) + SELECT name, age, __DORIS_DELETE_SIGN__ FROM cdc_stream( + "type" = "postgres", + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "table" = "${pgTable}", + "offset" = "initial", + "snapshot_split_size" = "1", + "include_delete_sign" = "true" + ) + """ + + // wait for at least two snapshot tasks to succeed (split_size=1 → 2 splits) + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def cnt = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'""" + log.info("SucceedTaskCount: " + cnt) + cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 2 + }) + } catch (Exception ex) { + log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'""")) + log.info("tasks: " + (sql """select * from tasks("type"="insert") where JobName='${jobName}'""")) + throw ex + } + + // verify snapshot data + qt_snapshot_data """ SELECT name, age FROM ${currentDb}.${dorisTable} ORDER BY name """ + + // insert incremental rows + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) VALUES ('C1', 3)""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) VALUES ('D1', 4)""" + } + + // wait for binlog tasks to pick up the new rows + try { + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def rows = sql """SELECT count(1) FROM ${currentDb}.${dorisTable} WHERE name IN ('C1', 'D1')""" + log.info("incremental rows: " + rows) + (rows.get(0).get(0) as int) == 2 + }) + } catch (Exception ex) { + log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'""")) + log.info("tasks: " + (sql """select * from tasks("type"="insert") where JobName='${jobName}'""")) + throw ex + } + + qt_incremental_data """ SELECT name, age, __DORIS_DELETE_SIGN__ FROM ${currentDb}.${dorisTable} ORDER BY name """ + + // verify incremental UPDATE + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """UPDATE ${pgDB}.${pgSchema}.${pgTable} SET age = 30 WHERE name = 'C1'""" + } + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def rows = sql """SELECT age FROM ${currentDb}.${dorisTable} WHERE name = 'C1'""" + rows.size() == 1 && (rows.get(0).get(0) as int) == 30 + }) + + qt_after_update """ SELECT name, age, __DORIS_DELETE_SIGN__ FROM ${currentDb}.${dorisTable} ORDER BY name """ + + // verify incremental DELETE + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DELETE FROM ${pgDB}.${pgSchema}.${pgTable} WHERE name = 'D1'""" + } + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def rows = sql """SELECT count(1) FROM ${currentDb}.${dorisTable} WHERE name = 'D1'""" + (rows.get(0).get(0) as int) == 0 + }) + + qt_after_delete """ SELECT name, age, __DORIS_DELETE_SIGN__ FROM ${currentDb}.${dorisTable} ORDER BY name """ + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${dorisTable} force""" + } +}