From 29aa2861b3bcd327a3c6cf6c4d7dba4ed4e17381 Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 22 May 2026 11:45:21 +0800 Subject: [PATCH 1/7] [regression-test](streaming-job) add cdc operational cases for offset modes and pg slot lifecycle --- ...st_streaming_mysql_job_offset_earliest.out | 9 + ...streaming_mysql_job_offset_earliest.groovy | 130 +++++++++++ ...g_postgres_job_drop_during_snapshot.groovy | 148 ++++++++++++ ..._streaming_postgres_job_publication.groovy | 124 +++++++++- ...aming_postgres_job_slot_lsn_advance.groovy | 213 ++++++++++++++++++ ...tgres_job_special_offset_restart_fe.groovy | 173 ++++++++++++++ 6 files changed, 796 insertions(+), 1 deletion(-) create mode 100644 regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.out create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.groovy create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_drop_during_snapshot.groovy create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.out new file mode 100644 index 00000000000000..54d966bff91622 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_after_earliest_replay -- +1 alice +2 bob + +-- !select_after_earliest_incr -- +1 alice_upd +3 charlie + diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.groovy new file mode 100644 index 00000000000000..f114e423633465 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_offset_earliest.groovy @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// offset=earliest is only valid for MySQL. It skips the snapshot phase entirely and +// replays binlog events from the oldest position available on the server. Two +// invariants this case guards: +// 1. Pre-existing rows whose INSERT events are still in binlog are picked up via +// the binlog path (no JDBC snapshot is run). +// 2. Subsequent binlog DML (INSERT/UPDATE/DELETE) lands as usual. +// The table name is randomized so binlog events from prior CI runs of this case +// (with the same fixed name) cannot leak in and skew the final state — debezium +// include_tables filtering is the only line of defense and it matches by name. +suite("test_streaming_mysql_job_offset_earliest", + "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_job_offset_earliest_name" + def currentDb = (sql "select database()")[0][0] + def suffix = UUID.randomUUID().toString().replace("-", "").substring(0, 8) + def table1 = "earliest_offset_mysql_tbl_${suffix}" + def mysqlDb = "test_cdc_db" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + // Two INSERTs land in binlog BEFORE the job exists. earliest must replay them. + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + sql """CREATE TABLE ${mysqlDb}.${table1} ( + `id` int NOT NULL, + `name` varchar(100) DEFAULT NULL, + PRIMARY KEY (`id`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (1, 'alice')""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (2, 'bob')""" + } + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "earliest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def cnt = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + cnt.size() == 1 && cnt.get(0).get(0) == 2 + }) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex + } + + qt_select_after_earliest_replay """ SELECT id, name FROM ${currentDb}.${table1} ORDER BY id """ + + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (3, 'charlie')""" + sql """UPDATE ${mysqlDb}.${table1} SET name='alice_upd' WHERE id=1""" + sql """DELETE FROM ${mysqlDb}.${table1} WHERE id=2""" + } + + try { + Awaitility.await().atMost(180, SECONDS).pollInterval(2, SECONDS).until({ + def cnt = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + def upd = sql """SELECT name FROM ${currentDb}.${table1} WHERE id=1""" + def del = sql """SELECT count(*) FROM ${currentDb}.${table1} WHERE id=2""" + def updName = upd.size() == 0 ? null : upd.get(0).get(0) + log.info("incr cnt=${cnt} upd=${updName} del=${del}") + cnt.get(0).get(0) == 2 && updName == 'alice_upd' && del.get(0).get(0) == 0 + }) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job (incr): " + showjob) + log.info("show task (incr): " + showtask) + throw ex + } + + qt_select_after_earliest_incr """ SELECT id, name FROM ${currentDb}.${table1} ORDER BY id """ + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + } + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_drop_during_snapshot.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_drop_during_snapshot.groovy new file mode 100644 index 00000000000000..1856a5eb9f3deb --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_drop_during_snapshot.groovy @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// DROP JOB during the snapshot phase (chunks still being dispatched) must still +// clean up auto-managed PG resources. test_streaming_postgres_job_publication only +// covers DROP JOB after the job has reached a steady state — the in-flight drop +// path goes through a different cancel/cleanup branch and historically leaks the +// replication slot if cdc_client cancellation races with the FE-side resource drop. +suite("test_streaming_postgres_job_drop_during_snapshot", + "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_pg_drop_during_snapshot_job" + def currentDb = (sql "select database()")[0][0] + def table1 = "drop_during_snapshot_pg_tbl" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + def totalRows = 300 + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "id" varchar(20) PRIMARY KEY, + "name" varchar(200) + )""" + StringBuilder sb = new StringBuilder() + sb.append("INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, name) VALUES ") + for (int i = 1; i <= totalRows; i++) { + if (i > 1) sb.append(", ") + String key = "k_" + String.format("%05d", i) + sb.append("('${key}', 'name_${i}')") + } + sql sb.toString() + } + + // Small split_size + single parallelism makes the snapshot slow enough that + // we can reliably catch it mid-flight before DROP. + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "initial", + "snapshot_split_size" = "5", + "snapshot_parallelism" = "1" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + // Capture jobId for derived slot/publication names (matches the + // doris_cdc_${jobId} / doris_pub_${jobId} pattern in DataSourceConfigKeys). + def jobRow = sql """select Id from jobs("type"="insert") where Name='${jobName}'""" + assert jobRow.size() == 1 : "job did not register" + def jobId = jobRow.get(0).get(0).toString() + def expectedSlot = "doris_cdc_${jobId}" + def expectedPub = "doris_pub_${jobId}" + + // Step 1: catch the job mid-snapshot. Confirm slot+publication actually exist + // on PG before issuing DROP — otherwise the cleanup assertion below is vacuous. + try { + Awaitility.await().atMost(180, SECONDS).pollInterval(1, SECONDS).until({ + def succeed = sql """select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}'""" + def cnt = sql """SELECT COUNT(*) FROM ${currentDb}.${table1}""" + log.info("pre-drop succeed=${succeed} rows=${cnt}") + succeed.size() == 1 && + Integer.parseInt(succeed.get(0).get(0).toString()) >= 2 && + cnt.get(0).get(0) < totalRows + }) + } catch (Exception ex) { + log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'""")) + throw ex + } + def rowsBeforeDrop = sql("""SELECT COUNT(*) FROM ${currentDb}.${table1}""").get(0).get(0) as int + assert rowsBeforeDrop < totalRows : "snapshot finished before we could DROP — case is moot" + + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def slotBefore = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${expectedSlot}'""" + def pubBefore = sql """SELECT COUNT(1) FROM pg_publication WHERE pubname = '${expectedPub}'""" + log.info("pre-drop pg: slot=${slotBefore} pub=${pubBefore}") + assert slotBefore[0][0] == 1 : "auto slot must exist before DROP — guard against fixture regression" + assert pubBefore[0][0] == 1 : "auto publication must exist before DROP" + } + + // Step 2: DROP while snapshot tasks are still being dispatched. + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + + // Step 3: slot and publication must both be cleaned up despite in-flight task + // cancellation. Polling avoids flakiness on slower environments where the + // FE drop-resources phase runs asynchronously with task cancel. + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).untilAsserted { + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def slotAfter = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${expectedSlot}'""" + def pubAfter = sql """SELECT COUNT(1) FROM pg_publication WHERE pubname = '${expectedPub}'""" + log.info("post-drop pg: slot=${slotAfter} pub=${pubAfter}") + assert slotAfter[0][0] == 0 : "slot ${expectedSlot} leaked after DROP during snapshot" + assert pubAfter[0][0] == 0 : "publication ${expectedPub} leaked after DROP during snapshot" + } + } + + // Step 4: job row must be gone on FE side. + def jobCount = sql """select count(1) from jobs("type"="insert") where Name = '${jobName}'""" + assert jobCount.get(0).get(0) == 0 : "job row not removed after DROP" + + // Cleanup + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + } + sql """drop table if exists ${currentDb}.${table1} force""" + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy index 1d29b659a60751..d38d07d8c4e1ed 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy @@ -20,11 +20,15 @@ import org.awaitility.Awaitility import static java.util.concurrent.TimeUnit.SECONDS -// Verify per-resource ownership of PG replication slot and publication: +// Verify per-resource ownership of PG replication slot and publication, plus +// the fail-fast validation paths in PostgresResourceValidator: // Test 1: auto-generated slot & publication — created on job start, cleaned up on drop // Test 2: user-provided slot & publication — Doris uses but never drops them // Test 3: mixed (user publication + auto slot) — only auto slot is dropped on job deletion // Test 4: slot_name / publication_name are immutable via ALTER JOB +// Test 5: user-provided slot_name that does not exist → actionable error +// Test 6: user-provided publication_name that does not exist → actionable error +// Test 7: user-provided publication exists but is missing required tables → actionable error suite("test_streaming_postgres_job_publication", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { def jobName = "test_pg_pub_job" def currentDb = (sql "select database()")[0][0] @@ -401,6 +405,124 @@ suite("test_streaming_postgres_job_publication", "p0,external,pg,external_docker sql """DROP JOB IF EXISTS where jobname = '${alterJob}'""" + // PostgresResourceValidator fail-fast paths: every CREATE JOB goes through + // the validator, and operators most often hit it via three misconfigurations + // below. Each error must be actionable (mention the missing resource by name + // and hint at the fix) so support can read the SQL error and move on. + def validatorJob = "test_pg_pub_validator_job" + sql """DROP JOB IF EXISTS where jobname = '${validatorJob}'""" + // Recreate the PG tables — Test 4 left them present but the alterJob is gone. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table2}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "id" int PRIMARY KEY, + "name" varchar(200) + )""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table2} ( + "id" int PRIMARY KEY, + "name" varchar(200) + )""" + } + + // ========== Test 5: user-provided slot_name does not exist ========== + // Pre-create a valid publication so the only fail-fast path is the slot. + def goodPub5 = "test_pg_pub_validator_pub_5" + def missingSlot5 = "test_pg_pub_validator_missing_slot_5" + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP PUBLICATION IF EXISTS ${goodPub5}""" + sql """CREATE PUBLICATION ${goodPub5} FOR TABLE ${pgDB}.${pgSchema}.${table1}""" + def existing = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${missingSlot5}'""" + if (existing[0][0] != 0) { + sql """SELECT pg_drop_replication_slot('${missingSlot5}')""" + } + } + test { + sql """CREATE JOB ${validatorJob} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "slot_name" = "${missingSlot5}", + "publication_name" = "${goodPub5}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + exception "replication slot does not exist" + } + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP PUBLICATION IF EXISTS ${goodPub5}""" + } + + // ========== Test 6: user-provided publication_name does not exist ========== + def missingPub6 = "test_pg_pub_validator_missing_pub_6" + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP PUBLICATION IF EXISTS ${missingPub6}""" + } + test { + sql """CREATE JOB ${validatorJob} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "publication_name" = "${missingPub6}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + exception "publication does not exist" + } + + // ========== Test 7: user pub exists but does not include the requested tables ========== + // Publication covers ${table2} only, but the job asks for ${table1} → validator + // must reject with the actionable "ALTER PUBLICATION ... ADD TABLE" hint. + def partialPub7 = "test_pg_pub_validator_partial_pub_7" + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP PUBLICATION IF EXISTS ${partialPub7}""" + sql """CREATE PUBLICATION ${partialPub7} FOR TABLE ${pgDB}.${pgSchema}.${table2}""" + } + test { + sql """CREATE JOB ${validatorJob} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "publication_name" = "${partialPub7}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + exception "missing required tables" + } + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP PUBLICATION IF EXISTS ${partialPub7}""" + } + // Cleanup PG tables connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy new file mode 100644 index 00000000000000..38ca8e7c9ad39d --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// pg_replication_slots.confirmed_flush_lsn must advance as cdc_client consumes +// WAL, must NOT advance while the job is paused (after the in-flight task +// finishes its final ack), and must resume advancing after RESUME. A stuck +// confirmed_flush_lsn means WAL piles up on the source and eventually exhausts +// disk — operationally this is the single most important health signal for a +// long-running PG CDC job. See [[project_pgcdc_task_offset_stuck]] for the +// customer-site bug class this guards against. +// +// PAUSE semantics (verified in code): +// PipelineCoordinator.writeRecords runs one maxInterval window per task, +// then commitSourceOffset (acks LSN to PG) and finishSplitRecords (closes +// the replication connection). PAUSE only stops the FE from scheduling the +// next task — the in-flight task runs to its natural end and acks LSN one +// last time. After that final ack, no consumer exists and no further LSN +// advancement is possible regardless of WAL growth on the source. +// Hence the test waits for LSN to settle after PAUSE before asserting it stays +// frozen. max_interval=3 keeps the in-flight task short so the settle window +// stays under 30s. +// +// Uses a user-provided slot/publication for two reasons: (1) the slot name is +// known up front so we don't have to fish jobId out of the jobs() view, and +// (2) DROP JOB then leaves the slot intact, so post-test cleanup is explicit. +suite("test_streaming_postgres_job_slot_lsn_advance", + "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_pg_slot_lsn_advance_job" + def currentDb = (sql "select database()")[0][0] + def table1 = "slot_lsn_advance_pg_tbl" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + def userSlot = "slot_lsn_advance_user_slot" + def userPub = "slot_lsn_advance_user_pub" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "id" int PRIMARY KEY, + "name" varchar(200) + )""" + sql """DROP PUBLICATION IF EXISTS ${userPub}""" + sql """CREATE PUBLICATION ${userPub} FOR TABLE ${pgDB}.${pgSchema}.${table1}""" + def existing = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${userSlot}'""" + if (existing[0][0] != 0) { + sql """SELECT pg_drop_replication_slot('${userSlot}')""" + } + sql """SELECT pg_create_logical_replication_slot('${userSlot}', 'pgoutput')""" + } + + // offset=latest skips snapshot and goes straight to streaming — that is + // where confirmed_flush_lsn is meaningful. max_interval=3 keeps each + // task short so PAUSE settles fast. + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "slot_name" = "${userSlot}", + "publication_name" = "${userPub}", + "offset" = "latest" + ) + PROPERTIES ("max_interval" = "3") + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + Awaitility.await().atMost(120, SECONDS).pollInterval(1, SECONDS).until({ + def st = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + st.size() == 1 && st.get(0).get(0) == "RUNNING" + }) + + // Helper to read confirmed_flush_lsn as BigInteger (Long would overflow + // on production-scale wraparound; BigInteger is safe). + Closure readLsn = { + BigInteger out = BigInteger.valueOf(-1L) + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def r = sql """SELECT confirmed_flush_lsn::text FROM pg_replication_slots WHERE slot_name = '${userSlot}'""" + if (r.size() == 1 && r.get(0).get(0) != null) { + def parts = r.get(0).get(0).toString().split("/") + out = new BigInteger(parts[0], 16).shiftLeft(32).add(new BigInteger(parts[1], 16)) + } + } + out + } + + def lsn0 = readLsn() + log.info("initial confirmed_flush_lsn=${lsn0}") + assert lsn0.signum() > 0 : "user slot ${userSlot} confirmed_flush_lsn is null/invalid" + + // ===== Phase 1: steady-state advancement ===== + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + for (int i = 1; i <= 20; i++) { + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (${i}, 'name_${i}')""" + } + } + Awaitility.await().atMost(180, SECONDS).pollInterval(2, SECONDS).until({ + def cnt = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + cnt.size() == 1 && cnt.get(0).get(0) >= 20 + }) + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def cur = readLsn() + log.info("phase1 poll lsn=${cur} (init=${lsn0})") + cur > lsn0 + }) + + // ===== Phase 2: PAUSE — in-flight task acks once then LSN must freeze ===== + sql """PAUSE JOB where jobname = '${jobName}'""" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({ + def st = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + st.size() == 1 && st.get(0).get(0) == "PAUSED" + }) + + // Wait for LSN to settle: N consecutive equal samples confirm the in-flight + // task has completed its final commitSourceOffset and the connection is closed. + BigInteger lastLsn = BigInteger.valueOf(-1L) + int stable = 0 + final int requiredStable = 4 + Awaitility.await().atMost(120, SECONDS).pollInterval(4, SECONDS).until({ + BigInteger cur = readLsn() + if (cur == lastLsn) { + stable++ + } else { + stable = 1 + lastLsn = cur + } + log.info("pause-settle lsn=${cur} stable=${stable}/${requiredStable}") + stable >= requiredStable + }) + def lsnAtPauseSettled = lastLsn + + // Generate WAL while paused. With no consumer the slot's + // confirmed_flush_lsn must remain frozen even though WAL is growing. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + for (int i = 100; i < 120; i++) { + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (${i}, 'paused_${i}')""" + } + } + sleep(15000) + def lsnDuringPause = readLsn() + log.info("paused: settled=${lsnAtPauseSettled} duringPause=${lsnDuringPause}") + assert lsnDuringPause == lsnAtPauseSettled : + "confirmed_flush_lsn advanced while paused with no consumer: " + + "${lsnAtPauseSettled} -> ${lsnDuringPause}" + + // ===== Phase 3: RESUME — LSN must advance past the paused snapshot ===== + sql """RESUME JOB where jobname = '${jobName}'""" + Awaitility.await().atMost(60, SECONDS).pollInterval(1, SECONDS).until({ + def st = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + st.size() == 1 && st.get(0).get(0) == "RUNNING" + }) + Awaitility.await().atMost(180, SECONDS).pollInterval(2, SECONDS).until({ + def cnt = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + cnt.size() == 1 && cnt.get(0).get(0) >= 40 + }) + Awaitility.await().atMost(180, SECONDS).pollInterval(2, SECONDS).until({ + def cur = readLsn() + log.info("phase3 poll lsn=${cur} (pause=${lsnAtPauseSettled})") + cur > lsnAtPauseSettled + }) + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + + // User-provided slot/pub survive DROP JOB; clean up manually. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def slotStillThere = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${userSlot}'""" + assert slotStillThere[0][0] == 1 : "user-provided slot must not be dropped by Doris" + sql """SELECT pg_drop_replication_slot('${userSlot}')""" + sql """DROP PUBLICATION IF EXISTS ${userPub}""" + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + } + sql """drop table if exists ${currentDb}.${table1} force""" + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy new file mode 100644 index 00000000000000..1b0570a828db1b --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Mirror of test_streaming_mysql_job_special_offset_restart_fe for the PG path: +// CREATE JOB with a JSON LSN offset, sync, restart FE, verify currentOffset +// survives the replay and subsequent binlog DML still lands. +// +// PG-specific wrinkle: an auto-managed slot starts retaining WAL only at slot +// creation time, so a CREATE-with-past-LSN against an auto slot would fail +// because PG has already purged the requested LSN. We therefore pre-create a +// user-provided slot first — that pins the WAL retention horizon back in time +// far enough to make the LSN we capture valid. +suite("test_streaming_postgres_job_special_offset_restart_fe", + "docker,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_pg_special_offset_restart_fe" + def options = new ClusterOptions() + options.setFeNum(1) + options.cloudMode = null + + docker(options) { + def currentDb = (sql "select database()")[0][0] + def table1 = "special_offset_restart_pg_tbl" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + def userSlot = "special_offset_restart_slot" + def userPub = "special_offset_restart_pub" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // Setup: fresh PG table + fresh user slot/pub. Slot must be created + // BEFORE the LSN we capture below, otherwise PG would have purged + // the WAL covering that LSN by the time the job tries to replay it. + def lsnAtCreate = "" + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "id" int PRIMARY KEY, + "name" varchar(100) + )""" + sql """DROP PUBLICATION IF EXISTS ${userPub}""" + sql """CREATE PUBLICATION ${userPub} FOR TABLE ${pgDB}.${pgSchema}.${table1}""" + def existing = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${userSlot}'""" + if (existing[0][0] != 0) { + sql """SELECT pg_drop_replication_slot('${userSlot}')""" + } + sql """SELECT pg_create_logical_replication_slot('${userSlot}', 'pgoutput')""" + + // Capture LSN AFTER slot creation, BEFORE the INSERTs the job will read. + def lsnRows = sql """SELECT pg_current_wal_lsn()::text""" + def lsnStr = lsnRows[0][0].toString() + def parts = lsnStr.split("/") + def high = Long.parseLong(parts[0], 16) + def low = Long.parseLong(parts[1], 16) + lsnAtCreate = String.valueOf((high << 32) + low) + log.info("CREATE LSN mark: ${lsnStr} -> numeric: ${lsnAtCreate}") + + // Inserts after the mark: these are what the job should stream. + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 'alice')""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 'bob')""" + } + + def offsetJson = """{"lsn":"${lsnAtCreate}"}""" + log.info("Creating job with LSN offset: ${offsetJson}") + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "slot_name" = "${userSlot}", + "publication_name" = "${userPub}", + "offset" = '${offsetJson}' + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def succeed = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'""" + def cnt = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + log.info("pre-restart succeed=${succeed} rows=${cnt}") + succeed.size() == 1 && + (succeed.get(0).get(0) as int) >= 1 && + cnt.size() == 1 && cnt.get(0).get(0) == 2 + }) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } + + def jobInfoBefore = sql """ + select loadStatistic, status, currentOffset from jobs("type"="insert") where Name='${jobName}' + """ + log.info("jobInfoBefore: " + jobInfoBefore) + assert jobInfoBefore.get(0).get(1) == "RUNNING" + + // Restart FE — currentOffset must replay cleanly from BDBJE editlog + txn attachments. + cluster.restartFrontends() + sleep(60000) + context.reconnectFe() + + def jobInfoAfter = sql """ + select loadStatistic, status, currentOffset from jobs("type"="insert") where Name='${jobName}' + """ + log.info("jobInfoAfter: " + jobInfoAfter) + assert jobInfoAfter.get(0).get(1) == "RUNNING" + assert jobInfoAfter.get(0).get(2) == jobInfoBefore.get(0).get(2) : + "currentOffset diverged after restart: before=${jobInfoBefore.get(0).get(2)} after=${jobInfoAfter.get(0).get(2)}" + + // Post-restart binlog still lands. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (3, 'charlie')""" + } + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + result[0][0] >= 3 + }) + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + // User-provided slot/pub must survive DROP JOB; clean up manually. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def slotStillThere = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${userSlot}'""" + assert slotStillThere[0][0] == 1 : + "user-provided slot ${userSlot} must not be dropped by Doris" + sql """SELECT pg_drop_replication_slot('${userSlot}')""" + sql """DROP PUBLICATION IF EXISTS ${userPub}""" + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + } + } + } +} From 812a5395bb2d2d2658952fa0b2f4caf9b94e7565 Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 22 May 2026 14:25:41 +0800 Subject: [PATCH 2/7] [regression-test](streaming-job) poll slot inactive before pg_drop_replication_slot --- ...t_streaming_postgres_job_slot_lsn_advance.groovy | 13 ++++++++++++- ...ng_postgres_job_special_offset_restart_fe.groovy | 11 +++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy index 38ca8e7c9ad39d..83e9382f0728f7 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy @@ -200,7 +200,18 @@ suite("test_streaming_postgres_job_slot_lsn_advance", sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" - // User-provided slot/pub survive DROP JOB; clean up manually. + // User-provided slot/pub survive DROP JOB; clean up manually. After DROP JOB + // the cdc_client may still be winding down its replication connection — PG + // rejects pg_drop_replication_slot on an active slot, so poll active=false + // before issuing the drop. + Awaitility.await().atMost(60, SECONDS).pollInterval(1, SECONDS).until({ + boolean inactive = false + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def r = sql """SELECT active FROM pg_replication_slots WHERE slot_name = '${userSlot}'""" + inactive = r.size() == 1 && r.get(0).get(0) == false + } + inactive + }) connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { def slotStillThere = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${userSlot}'""" assert slotStillThere[0][0] == 1 : "user-provided slot must not be dropped by Doris" diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy index 1b0570a828db1b..37a607c960e421 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy @@ -160,6 +160,17 @@ suite("test_streaming_postgres_job_special_offset_restart_fe", sql """drop table if exists ${currentDb}.${table1} force""" // User-provided slot/pub must survive DROP JOB; clean up manually. + // After DROP JOB the cdc_client may still be winding down its replication + // connection — PG rejects pg_drop_replication_slot on an active slot, so + // poll active=false before issuing the drop. + Awaitility.await().atMost(60, SECONDS).pollInterval(1, SECONDS).until({ + boolean inactive = false + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + def r = sql """SELECT active FROM pg_replication_slots WHERE slot_name = '${userSlot}'""" + inactive = r.size() == 1 && r.get(0).get(0) == false + } + inactive + }) connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { def slotStillThere = sql """SELECT COUNT(1) FROM pg_replication_slots WHERE slot_name = '${userSlot}'""" assert slotStillThere[0][0] == 1 : From 266b2f7cd84f210952ad268bb7a69b4667da259d Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 22 May 2026 17:30:52 +0800 Subject: [PATCH 3/7] [fix](streaming-job) guard cdc_client reader cleanup against concurrent close --- .../reader/JdbcIncrementalSourceReader.java | 49 +++++++++---------- .../reader/mysql/MySqlSourceReader.java | 43 +++++++++------- ...aming_postgres_job_slot_lsn_advance.groovy | 2 +- 3 files changed, 50 insertions(+), 44 deletions(-) diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index 826866e5885ed1..8a2273029bad2d 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -63,13 +63,14 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -95,11 +96,11 @@ public abstract class JdbcIncrementalSourceReader extends AbstractCdcSourceReade Fetcher, SnapshotSplitState>> snapshotReaderContexts; - private Set completedSplitIds = new HashSet<>(); + private Set completedSplitIds = ConcurrentHashMap.newKeySet(); // Parallel polling support private ExecutorService pollExecutor; - private List> activePollFutures; + private volatile List> activePollFutures; // Stream/binlog reader (single reader for stream split) private Fetcher streamReader; @@ -109,7 +110,7 @@ public abstract class JdbcIncrementalSourceReader extends AbstractCdcSourceReade public JdbcIncrementalSourceReader() { this.serializer = new DebeziumJsonDeserializer(); - this.snapshotReaderContexts = new ArrayList<>(); + this.snapshotReaderContexts = new CopyOnWriteArrayList<>(); } @Override @@ -285,7 +286,7 @@ public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) throw } /** Prepare snapshot splits (unified handling for single or multiple splits) */ - private SplitReadResult prepareSnapshotSplits( + private synchronized SplitReadResult prepareSnapshotSplits( List splits, JobBaseRecordRequest baseReq) throws Exception { @@ -387,7 +388,7 @@ private SplitReadResult prepareSnapshotSplits( } /** Prepare stream split */ - private SplitReadResult prepareStreamSplit( + private synchronized SplitReadResult prepareStreamSplit( Map offsetMeta, JobBaseRecordRequest baseReq) throws Exception { // Load tableSchemas from FE if available (avoids re-discover on restart) tryLoadTableSchemasFromRequest(baseReq); @@ -443,7 +444,7 @@ private SplitReadResult prepareStreamSplit( } @Override - public Iterator pollRecords() throws Exception { + public synchronized Iterator pollRecords() throws Exception { if (!snapshotReaderContexts.isEmpty()) { // Snapshot split mode return pollRecordsFromSnapshotReaders(); @@ -505,7 +506,7 @@ private void startParallelPolling() { LOG.info( "Starting parallel polling for {} snapshot readers", snapshotReaderContexts.size()); - activePollFutures = new ArrayList<>(); + activePollFutures = new CopyOnWriteArrayList<>(); for (int i = 0; i < snapshotReaderContexts.size(); i++) { final int index = i; @@ -563,12 +564,9 @@ private PollResult waitForAnyCompletion() throws Exception { anyOf.join(); // Wait for at least one to complete // Find and process completed futures - Iterator> iterator = activePollFutures.iterator(); - while (iterator.hasNext()) { - CompletableFuture future = iterator.next(); - + for (CompletableFuture future : activePollFutures) { if (future.isDone()) { - iterator.remove(); // Remove from active list + activePollFutures.remove(future); PollResult result = future.get(); if (result != null) { // Found a reader with data, return immediately @@ -867,7 +865,7 @@ public boolean isSnapshotSplit(SourceSplit split) { } @Override - public void finishSplitRecords() { + public synchronized void finishSplitRecords() { // Cancel any active poll operations if (activePollFutures != null) { activePollFutures.forEach(f -> f.cancel(true)); @@ -923,19 +921,20 @@ protected abstract Map discoverTableSchemas( public void close(JobBaseConfig jobConfig) { LOG.info("Close source reader for job {}", jobConfig.getJobId()); - // Cancel any active poll operations - if (activePollFutures != null) { - activePollFutures.forEach(f -> f.cancel(true)); - activePollFutures.clear(); - activePollFutures = null; + // Cancel outside the lock so a thread blocked in anyOf.join() releases it. + List> activePolls = this.activePollFutures; + if (activePolls != null) { + for (CompletableFuture f : activePolls) { + f.cancel(true); + } } - // Clean up all readers - finishSplitRecords(); - - if (tableSchemas != null) { - tableSchemas.clear(); - tableSchemas = null; + synchronized (this) { + finishSplitRecords(); + if (tableSchemas != null) { + tableSchemas.clear(); + tableSchemas = null; + } } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index f576485c2f8cb2..142c18f5925e75 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -79,7 +79,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -88,6 +87,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -123,11 +123,11 @@ public class MySqlSourceReader extends AbstractCdcSourceReader { SnapshotReaderContext< MySqlSnapshotSplit, SnapshotSplitReader, MySqlSnapshotSplitState>> snapshotReaderContexts; - private Set completedSplitIds = new HashSet<>(); + private Set completedSplitIds = ConcurrentHashMap.newKeySet(); // Parallel polling support private ExecutorService pollExecutor; - private List> activePollFutures; + private volatile List> activePollFutures; // Binlog reader (single reader for binlog split) private BinlogSplitReader binlogReader; @@ -136,7 +136,7 @@ public class MySqlSourceReader extends AbstractCdcSourceReader { public MySqlSourceReader() { this.serializer = new MySqlDebeziumJsonDeserializer(); - this.snapshotReaderContexts = new ArrayList<>(); + this.snapshotReaderContexts = new CopyOnWriteArrayList<>(); } @Override @@ -341,7 +341,7 @@ private List extractSnapshotSplits( } /** Prepare snapshot splits (unified handling for single or multiple splits) */ - private SplitReadResult prepareSnapshotSplits( + private synchronized SplitReadResult prepareSnapshotSplits( List splits, JobBaseRecordRequest baseReq) throws Exception { LOG.info("Preparing {} snapshot split(s) for reading", splits.size()); @@ -429,7 +429,7 @@ private SplitReadResult prepareSnapshotSplits( } /** Prepare binlog split */ - private SplitReadResult prepareBinlogSplit( + private synchronized SplitReadResult prepareBinlogSplit( Map offsetMeta, JobBaseRecordRequest baseReq) throws Exception { // Load tableSchemas from FE if available (avoids re-discover on restart) tryLoadTableSchemasFromRequest(baseReq); @@ -465,7 +465,7 @@ private SplitReadResult prepareBinlogSplit( } @Override - public Iterator pollRecords() throws Exception { + public synchronized Iterator pollRecords() throws Exception { if (!snapshotReaderContexts.isEmpty()) { // Snapshot split mode return pollRecordsFromSnapshotReaders(); @@ -527,7 +527,7 @@ private void startParallelPolling() { LOG.info( "Starting parallel polling for {} snapshot readers", snapshotReaderContexts.size()); - activePollFutures = new ArrayList<>(); + activePollFutures = new CopyOnWriteArrayList<>(); for (int i = 0; i < snapshotReaderContexts.size(); i++) { final int index = i; @@ -582,12 +582,9 @@ private PollResult waitForAnyCompletion() throws Exception { anyOf.join(); // Wait for at least one to complete // Find and process completed futures - Iterator> iterator = activePollFutures.iterator(); - while (iterator.hasNext()) { - CompletableFuture future = iterator.next(); - + for (CompletableFuture future : activePollFutures) { if (future.isDone()) { - iterator.remove(); // Remove from active list + activePollFutures.remove(future); PollResult result = future.get(); if (result != null) { // Found a reader with data, return immediately @@ -1024,7 +1021,7 @@ public boolean isSnapshotSplit(SourceSplit split) { } @Override - public void finishSplitRecords() { + public synchronized void finishSplitRecords() { // Cancel any active poll operations if (activePollFutures != null) { @@ -1130,10 +1127,20 @@ private Map discoverTableSchemas(JobBaseConfi public void close(JobBaseConfig jobConfig) { LOG.info("Close source reader for job {}", jobConfig.getJobId()); - finishSplitRecords(); - if (tableSchemas != null) { - tableSchemas.clear(); - tableSchemas = null; + // Cancel outside the lock so a thread blocked in anyOf.join() releases it. + List> activePolls = this.activePollFutures; + if (activePolls != null) { + for (CompletableFuture f : activePolls) { + f.cancel(true); + } + } + + synchronized (this) { + finishSplitRecords(); + if (tableSchemas != null) { + tableSchemas.clear(); + tableSchemas = null; + } } } diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy index 83e9382f0728f7..0ddf7cf94b2a77 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_lsn_advance.groovy @@ -84,6 +84,7 @@ suite("test_streaming_postgres_job_slot_lsn_advance", // where confirmed_flush_lsn is meaningful. max_interval=3 keeps each // task short so PAUSE settles fast. sql """CREATE JOB ${jobName} + PROPERTIES ("max_interval" = "3") ON STREAMING FROM POSTGRES ( "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", @@ -98,7 +99,6 @@ suite("test_streaming_postgres_job_slot_lsn_advance", "publication_name" = "${userPub}", "offset" = "latest" ) - PROPERTIES ("max_interval" = "3") TO DATABASE ${currentDb} ( "table.create.properties.replication_num" = "1" ) From c07e1cd0088b4fd48a8ca6d777a81734a2fb5996 Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 22 May 2026 18:43:24 +0800 Subject: [PATCH 4/7] [fix](streaming-job) keep cdc reader poll monitor-free; use BigInteger for PG LSN offset --- .../reader/JdbcIncrementalSourceReader.java | 23 +++++-------------- .../reader/mysql/MySqlSourceReader.java | 23 +++++-------------- ...tgres_job_special_offset_restart_fe.groovy | 5 ++-- 3 files changed, 14 insertions(+), 37 deletions(-) diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index 8a2273029bad2d..d12d1b424a312b 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -444,7 +444,7 @@ private synchronized SplitReadResult prepareStreamSplit( } @Override - public synchronized Iterator pollRecords() throws Exception { + public Iterator pollRecords() throws Exception { if (!snapshotReaderContexts.isEmpty()) { // Snapshot split mode return pollRecordsFromSnapshotReaders(); @@ -918,23 +918,12 @@ protected abstract Map discoverTableSchemas( JobBaseConfig config); @Override - public void close(JobBaseConfig jobConfig) { + public synchronized void close(JobBaseConfig jobConfig) { LOG.info("Close source reader for job {}", jobConfig.getJobId()); - - // Cancel outside the lock so a thread blocked in anyOf.join() releases it. - List> activePolls = this.activePollFutures; - if (activePolls != null) { - for (CompletableFuture f : activePolls) { - f.cancel(true); - } - } - - synchronized (this) { - finishSplitRecords(); - if (tableSchemas != null) { - tableSchemas.clear(); - tableSchemas = null; - } + finishSplitRecords(); + if (tableSchemas != null) { + tableSchemas.clear(); + tableSchemas = null; } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index 142c18f5925e75..be836eb8ffef32 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -465,7 +465,7 @@ private synchronized SplitReadResult prepareBinlogSplit( } @Override - public synchronized Iterator pollRecords() throws Exception { + public Iterator pollRecords() throws Exception { if (!snapshotReaderContexts.isEmpty()) { // Snapshot split mode return pollRecordsFromSnapshotReaders(); @@ -1124,23 +1124,12 @@ private Map discoverTableSchemas(JobBaseConfi } @Override - public void close(JobBaseConfig jobConfig) { + public synchronized void close(JobBaseConfig jobConfig) { LOG.info("Close source reader for job {}", jobConfig.getJobId()); - - // Cancel outside the lock so a thread blocked in anyOf.join() releases it. - List> activePolls = this.activePollFutures; - if (activePolls != null) { - for (CompletableFuture f : activePolls) { - f.cancel(true); - } - } - - synchronized (this) { - finishSplitRecords(); - if (tableSchemas != null) { - tableSchemas.clear(); - tableSchemas = null; - } + finishSplitRecords(); + if (tableSchemas != null) { + tableSchemas.clear(); + tableSchemas = null; } } diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy index 37a607c960e421..5fa538ae422ca4 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset_restart_fe.groovy @@ -79,9 +79,8 @@ suite("test_streaming_postgres_job_special_offset_restart_fe", def lsnRows = sql """SELECT pg_current_wal_lsn()::text""" def lsnStr = lsnRows[0][0].toString() def parts = lsnStr.split("/") - def high = Long.parseLong(parts[0], 16) - def low = Long.parseLong(parts[1], 16) - lsnAtCreate = String.valueOf((high << 32) + low) + lsnAtCreate = new BigInteger(parts[0], 16).shiftLeft(32) + .add(new BigInteger(parts[1], 16)).toString() log.info("CREATE LSN mark: ${lsnStr} -> numeric: ${lsnAtCreate}") // Inserts after the mark: these are what the job should stream. From a417110998685c17f7622991e99e8abcdd091d57 Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 25 May 2026 10:05:18 +0800 Subject: [PATCH 5/7] [fix](streaming-job) snapshot activePollFutures locally in waitForAnyCompletion to avoid close NPE race --- .../source/reader/JdbcIncrementalSourceReader.java | 14 +++++++------- .../source/reader/mysql/MySqlSourceReader.java | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index d12d1b424a312b..0b4e13b84449ea 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -556,30 +556,30 @@ private void startParallelPolling() { * data */ private PollResult waitForAnyCompletion() throws Exception { - while (!activePollFutures.isEmpty()) { - // Wait for any future to complete + List> snapshot = activePollFutures; + while (snapshot != null && !snapshot.isEmpty()) { CompletableFuture anyOf = - CompletableFuture.anyOf(activePollFutures.toArray(new CompletableFuture[0])); + CompletableFuture.anyOf(snapshot.toArray(new CompletableFuture[0])); anyOf.join(); // Wait for at least one to complete // Find and process completed futures - for (CompletableFuture future : activePollFutures) { + for (CompletableFuture future : snapshot) { if (future.isDone()) { - activePollFutures.remove(future); + snapshot.remove(future); PollResult result = future.get(); if (result != null) { - // Found a reader with data, return immediately LOG.info( "Got result from reader {}, {} futures remaining", result.context.getSplit().splitId(), - activePollFutures.size()); + snapshot.size()); completedSplitIds.add(result.context.getSplit().splitId()); return result; } // If result is null (no data), continue checking other futures } } + snapshot = activePollFutures; } // All futures completed but none had data return null; diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index be836eb8ffef32..1975b2c02ed23d 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -574,30 +574,30 @@ private void startParallelPolling() { * data */ private PollResult waitForAnyCompletion() throws Exception { - while (!activePollFutures.isEmpty()) { - // Wait for any future to complete + List> snapshot = activePollFutures; + while (snapshot != null && !snapshot.isEmpty()) { CompletableFuture anyOf = - CompletableFuture.anyOf(activePollFutures.toArray(new CompletableFuture[0])); + CompletableFuture.anyOf(snapshot.toArray(new CompletableFuture[0])); anyOf.join(); // Wait for at least one to complete // Find and process completed futures - for (CompletableFuture future : activePollFutures) { + for (CompletableFuture future : snapshot) { if (future.isDone()) { - activePollFutures.remove(future); + snapshot.remove(future); PollResult result = future.get(); if (result != null) { - // Found a reader with data, return immediately LOG.info( "Got result from reader {}, {} futures remaining", result.context.getSplit().splitId(), - activePollFutures.size()); + snapshot.size()); completedSplitIds.add(result.context.getSplit().splitId()); return result; } // If result is null (no data), continue checking other futures } } + snapshot = activePollFutures; } // All futures completed but none had data return null; From 2f03dc8d63a3c0fbebcd1084369c43d0ab245c87 Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 25 May 2026 15:28:51 +0800 Subject: [PATCH 6/7] [fix](streaming-job) snapshot stream/binlog reader+split+state locally to avoid close NPE race --- .../reader/JdbcIncrementalSourceReader.java | 19 +++++++++++++------ .../reader/mysql/MySqlSourceReader.java | 19 +++++++++++++------ 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index 0b4e13b84449ea..e0b89a9f5d12eb 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -612,24 +612,31 @@ private static class PollResult { /** Poll records from stream reader */ private Iterator pollRecordsFromStreamReader() throws InterruptedException { + Fetcher reader = streamReader; + StreamSplit split = streamSplit; + StreamSplitState state = streamSplitState; + if (reader == null || split == null || state == null) { + LOG.info("Stream reader is null at poll start, returning empty"); + return Collections.emptyIterator(); + } - Preconditions.checkState(streamReader != null, "streamReader is null"); - Preconditions.checkNotNull(streamSplitState, "streamSplitState is null"); - - Iterator dataIt = streamReader.pollSplitRecords(); + Iterator dataIt = reader.pollSplitRecords(); if (dataIt == null || !dataIt.hasNext()) { + if (streamReader == null) { + LOG.info("Stream reader is null after poll, returning empty"); + } return Collections.emptyIterator(); } SourceRecords sourceRecords = dataIt.next(); SplitRecords splitRecords = - new SplitRecords(streamSplit.splitId(), sourceRecords.iterator()); + new SplitRecords(split.splitId(), sourceRecords.iterator()); if (!sourceRecords.getSourceRecordList().isEmpty()) { LOG.info("{} Records received from stream", sourceRecords.getSourceRecordList().size()); } - return new FilteredRecordIterator(splitRecords, streamSplitState); + return new FilteredRecordIterator(splitRecords, state); } protected abstract DataType fromDbzColumn(Column splitColumn); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index 1975b2c02ed23d..091089a161ecf0 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -625,24 +625,31 @@ private static class PollResult { /** Poll records from binlog reader */ private Iterator pollRecordsFromBinlogReader() throws InterruptedException { + BinlogSplitReader reader = binlogReader; + MySqlBinlogSplit split = binlogSplit; + MySqlBinlogSplitState state = binlogSplitState; + if (reader == null || split == null || state == null) { + LOG.info("Binlog reader is null at poll start, returning empty"); + return Collections.emptyIterator(); + } - Preconditions.checkState(binlogReader != null, "binlogReader is null"); - Preconditions.checkNotNull(binlogSplitState, "binlogSplitState is null"); - - Iterator dataIt = binlogReader.pollSplitRecords(); + Iterator dataIt = reader.pollSplitRecords(); if (dataIt == null || !dataIt.hasNext()) { + if (binlogReader == null) { + LOG.info("Binlog reader is null after poll, returning empty"); + } return Collections.emptyIterator(); } SourceRecords sourceRecords = dataIt.next(); SplitRecords splitRecords = - new SplitRecords(binlogSplit.splitId(), sourceRecords.iterator()); + new SplitRecords(split.splitId(), sourceRecords.iterator()); if (!sourceRecords.getSourceRecordList().isEmpty()) { LOG.info("{} Records received from binlog", sourceRecords.getSourceRecordList().size()); } - return new FilteredRecordIterator(splitRecords, binlogSplitState); + return new FilteredRecordIterator(splitRecords, state); } /** From 9351276f7742312036e6cc11fec4c2988e82829e Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 25 May 2026 16:21:53 +0800 Subject: [PATCH 7/7] [fix](streaming-job) spotless format --- .../cdcclient/source/reader/JdbcIncrementalSourceReader.java | 3 +-- .../doris/cdcclient/source/reader/mysql/MySqlSourceReader.java | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index e0b89a9f5d12eb..ddbc71c7fd7212 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -629,8 +629,7 @@ private Iterator pollRecordsFromStreamReader() throws InterruptedE } SourceRecords sourceRecords = dataIt.next(); - SplitRecords splitRecords = - new SplitRecords(split.splitId(), sourceRecords.iterator()); + SplitRecords splitRecords = new SplitRecords(split.splitId(), sourceRecords.iterator()); if (!sourceRecords.getSourceRecordList().isEmpty()) { LOG.info("{} Records received from stream", sourceRecords.getSourceRecordList().size()); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index 091089a161ecf0..99ac1e0355baca 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -642,8 +642,7 @@ private Iterator pollRecordsFromBinlogReader() throws InterruptedE } SourceRecords sourceRecords = dataIt.next(); - SplitRecords splitRecords = - new SplitRecords(split.splitId(), sourceRecords.iterator()); + SplitRecords splitRecords = new SplitRecords(split.splitId(), sourceRecords.iterator()); if (!sourceRecords.getSourceRecordList().isEmpty()) { LOG.info("{} Records received from binlog", sourceRecords.getSourceRecordList().size());