Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ public class DataSourceConfigKeys {
public static final String SCHEMA = "schema";
public static final String INCLUDE_TABLES = "include_tables";
public static final String EXCLUDE_TABLES = "exclude_tables";
// initial,earliest,latest,{binlog,postion},\d{13}
// initial,earliest,latest,snapshot,{binlog,postion},\d{13}
public static final String OFFSET = "offset";
public static final String OFFSET_INITIAL = "initial";
public static final String OFFSET_EARLIEST = "earliest";
public static final String OFFSET_LATEST = "latest";
public static final String OFFSET_SNAPSHOT = "snapshot";
public static final String SNAPSHOT_SPLIT_SIZE = "snapshot_split_size";
public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism";
public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ private static boolean isValidValue(String key, String value) {

if (key.equals(DataSourceConfigKeys.OFFSET)
&& !(value.equals(DataSourceConfigKeys.OFFSET_INITIAL)
|| value.equals(DataSourceConfigKeys.OFFSET_LATEST))) {
|| value.equals(DataSourceConfigKeys.OFFSET_LATEST)
|| value.equals(DataSourceConfigKeys.OFFSET_SNAPSHOT))) {
return false;
Comment on lines 84 to 88
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,12 @@ public void onStreamTaskSuccess(AbstractStreamingTask task) throws JobException
}

Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
if (offsetProvider.hasReachedEnd()) {
// offset provider has reached a natural end, mark job as finished
log.info("Streaming insert job {} source data fully consumed, marking job as FINISHED", getJobId());
updateJobStatus(JobStatus.FINISHED);
return;
}
Comment on lines 622 to +628
AbstractStreamingTask nextTask = createStreamingTask();
this.runningStreamTask = nextTask;
log.info("Streaming insert job {} create next streaming insert task {} after task {} success",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public void before() throws Exception {
this.status = TaskStatus.RUNNING;
this.startTimeMs = System.currentTimeMillis();
this.runningOffset = offsetProvider.getNextOffset(null, sourceProperties);
if (this.runningOffset == null) {
// snapshot-only mode: all splits completed, task exits immediately
log.info("streaming multi task {} offset is null (snapshot-only completed), skip execution", taskId);
return;
}
log.info("streaming multi task {} get running offset: {}", taskId, runningOffset.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug (High): Early return from before() when offset is null leaves the job stuck

When getNextOffset() returns null (snapshot-only completed), this task returns early from before(), then run() also returns early, then onSuccess() returns false without calling onStreamTaskSuccess(). The StreamingMultiTblTask.onSuccess() never calls onStreamTaskSuccess() — it always returns false (line 243).

The job completion signal for multi-table tasks only comes via the external commitOffset()successCallback()onStreamTaskSuccess() callback path, which is never triggered for a no-op task (no data sent to BE, no offset committed).

Result: The task finishes silently, the job stays in RUNNING with a dead task reference, eventually times out to PAUSED, auto-resumes to PENDING, creates another null-offset task, and loops forever.

Suggested fix: In StreamingMultiTblTask.onSuccess(), when runningOffset == null, directly call streamingInsertJob.onStreamTaskSuccess(this) (after acquiring the write lock via the appropriate path). Alternatively, handle this in before() by checking offsetProvider.hasReachedEnd() and signaling the job directly.

}

Expand All @@ -119,6 +124,10 @@ public void run() throws JobException {
log.info("streaming task has been canceled, task id is {}", getTaskId());
return;
}
if (this.runningOffset == null) {
// offset is null when source has reached end (e.g. snapshot-only mode completed)
return;
}
Comment on lines 112 to +130
sendWriteRequest();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,14 @@ default String getPersistInfo() {
return null;
}

/**
* Returns true if the provider has reached a natural completion point
* and the job should be marked as FINISHED.
* Default: false (most providers run indefinitely).
*/
default boolean hasReachedEnd() {
return false;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ public Offset getNextOffset(StreamingJobProperties jobProps, Map<String, String>
nextOffset.setSplits(snapshotSplits);
return nextOffset;
} else if (currentOffset != null && currentOffset.snapshotSplit()) {
// snapshot to binlog
if (isSnapshotOnlyMode()) {
// snapshot-only mode: all splits done, signal job to stop
return null;
Comment on lines +120 to +121
}
// initial mode: snapshot to binlog
BinlogSplit binlogSplit = new BinlogSplit();
binlogSplit.setFinishedSplits(finishedSplits);
nextOffset.setSplits(Collections.singletonList(binlogSplit));
Expand Down Expand Up @@ -243,6 +247,9 @@ public boolean hasMoreDataToConsume() {
}

if (currentOffset.snapshotSplit()) {
if (isSnapshotOnlyMode() && remainingSplits.isEmpty()) {
return false;
}
return true;
}

Expand Down Expand Up @@ -376,10 +383,13 @@ public void replayIfNeed(StreamingInsertJob job) throws JobException {
if (!lastSnapshotSplits.isEmpty()) {
currentOffset.setSplits(lastSnapshotSplits);
} else {
// when snapshot to binlog phase fe restarts
BinlogSplit binlogSplit = new BinlogSplit();
binlogSplit.setFinishedSplits(finishedSplits);
currentOffset.setSplits(Collections.singletonList(binlogSplit));
if (!isSnapshotOnlyMode()) {
// initial mode: rebuild binlog split for snapshot-to-binlog transition
BinlogSplit binlogSplit = new BinlogSplit();
binlogSplit.setFinishedSplits(finishedSplits);
currentOffset.setSplits(Collections.singletonList(binlogSplit));
}
// snapshot-only: leave splits empty, hasReachedEnd() will return true
}
}
}
Expand Down Expand Up @@ -535,7 +545,21 @@ private boolean checkNeedSplitChunks(Map<String, String> sourceProperties) {
if (startMode == null) {
return false;
}
return DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startMode);
return DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startMode)
|| DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startMode);
}

private boolean isSnapshotOnlyMode() {
String offset = sourceProperties.get(DataSourceConfigKeys.OFFSET);
return DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(offset);
}

@Override
public boolean hasReachedEnd() {
return isSnapshotOnlyMode()
&& currentOffset != null
&& currentOffset.snapshotSplit()
&& remainingSplits.isEmpty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: hasReachedEnd() calls currentOffset.snapshotSplit() which requires non-null, non-empty splits (enforced by Preconditions.checkState in JdbcOffset.snapshotSplit()). After crash recovery for a completed snapshot-only job, currentOffset has null splits (see comment on replayIfNeed), so this method will throw IllegalStateException.

Consider either:

  1. Adding a null-safe split check here directly: currentOffset.getSplits() != null && !currentOffset.getSplits().isEmpty() && currentOffset.snapshotSplit()
  2. Or ensuring replayIfNeed always leaves currentOffset in a valid state with actual splits set.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest ftsReq)

// Check startup mode - for PostgreSQL, we use similar logic as MySQL
String startupMode = ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) {
if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
|| DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
remainingSnapshotSplits =
startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), ftsReq.getConfig());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest ftsReq)
StartupMode startupMode = sourceConfig.getStartupOptions().startupMode;
List<MySqlSnapshotSplit> remainingSnapshotSplits = new ArrayList<>();
MySqlBinlogSplit remainingBinlogSplit = null;
if (startupMode.equals(StartupMode.INITIAL)) {
if (startupMode.equals(StartupMode.INITIAL) || startupMode.equals(StartupMode.SNAPSHOT)) {
remainingSnapshotSplits =
startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), ftsReq.getConfig());
} else {
Expand Down Expand Up @@ -789,8 +789,9 @@ private MySqlSourceConfig generateMySqlConfig(Map<String, String> cdcConfig, Str
// setting startMode
String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) {
// do not need set offset when initial
// configFactory.startupOptions(StartupOptions.initial());
configFactory.startupOptions(StartupOptions.initial());
} else if (DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
configFactory.startupOptions(StartupOptions.snapshot());
} else if (DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) {
configFactory.startupOptions(StartupOptions.earliest());
BinlogOffset binlogOffset =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ private PostgresSourceConfig generatePostgresConfig(
String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) {
configFactory.startupOptions(StartupOptions.initial());
} else if (DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
configFactory.startupOptions(StartupOptions.snapshot());
} else if (DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) {
configFactory.startupOptions(StartupOptions.earliest());
} else if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(startupMode)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_snapshot_table1 --
A1 1
B1 2

-- !select_snapshot_table2 --
A2 1
B2 2

-- !select_after_finish_table1 --
A1 1
B1 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_snapshot_table1 --
A1 1
B1 2

-- !select_snapshot_table2 --
A2 1
B2 2

-- !select_after_finish_table1 --
A1 1
B1 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


import org.awaitility.Awaitility

import static java.util.concurrent.TimeUnit.SECONDS

/**
* Test snapshot-only mode (offset=snapshot):
* 1. Job syncs existing data via full snapshot.
* 2. Job transitions to FINISHED after snapshot completes (no binlog phase).
* 3. Data inserted after job finishes is NOT synced to Doris.
*/
suite("test_streaming_mysql_job_snapshot", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_mysql_job_snapshot_name"
def currentDb = (sql "select database()")[0][0]
def table1 = "user_info_mysql_snapshot1"
def table2 = "user_info_mysql_snapshot2"
def mysqlDb = "test_cdc_db"

sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
sql """drop table if exists ${currentDb}.${table1} force"""
sql """drop table if exists ${currentDb}.${table2} force"""

String enabled = context.config.otherConfigs.get("enableJdbcTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String mysql_port = context.config.otherConfigs.get("mysql_57_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String s3_endpoint = getS3Endpoint()
String bucket = getS3BucketName()
String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"

// prepare source tables and pre-existing data in mysql
connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
sql """DROP TABLE IF EXISTS ${mysqlDb}.${table2}"""
sql """CREATE TABLE ${mysqlDb}.${table1} (
`name` varchar(200) NOT NULL,
`age` int DEFAULT NULL,
PRIMARY KEY (`name`)
) ENGINE=InnoDB"""
sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('A1', 1)"""
sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('B1', 2)"""
sql """CREATE TABLE ${mysqlDb}.${table2} (
`name` varchar(200) NOT NULL,
`age` int DEFAULT NULL,
PRIMARY KEY (`name`)
) ENGINE=InnoDB"""
sql """INSERT INTO ${mysqlDb}.${table2} (name, age) VALUES ('A2', 1)"""
sql """INSERT INTO ${mysqlDb}.${table2} (name, age) VALUES ('B2', 2)"""
}

// create streaming job with offset=snapshot (snapshot-only mode)
sql """CREATE JOB ${jobName}
ON STREAMING
FROM MYSQL (
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}/${mysqlDb}",
"driver_url" = "${driver_url}",
"driver_class" = "com.mysql.cj.jdbc.Driver",
"user" = "root",
"password" = "123456",
"database" = "${mysqlDb}",
"include_tables" = "${table1},${table2}",
"offset" = "snapshot"
)
TO DATABASE ${currentDb} (
"table.create.properties.replication_num" = "1"
)
"""

// wait for job to transition to FINISHED
try {
Awaitility.await().atMost(300, SECONDS)
.pollInterval(2, SECONDS).until(
{
def jobStatus = sql """select Status from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
log.info("jobStatus: " + jobStatus)
jobStatus.size() == 1 && jobStatus.get(0).get(0) == 'FINISHED'
}
)
} catch (Exception ex) {
def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'"""
log.info("show job: " + showjob)
log.info("show task: " + showtask)
throw ex
}

// verify snapshot data is correctly synced
qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name asc """
qt_select_snapshot_table2 """ SELECT * FROM ${table2} order by name asc """

// insert new data into mysql after job is FINISHED
connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('C1', 3)"""
sql """UPDATE ${mysqlDb}.${table1} SET age = 99 WHERE name = 'A1'"""
}

// wait a bit and confirm new data is NOT synced (job is done, no binlog reading)
sleep(30000)

qt_select_after_finish_table1 """ SELECT * FROM ${table1} order by name asc """

// verify job status remains FINISHED
def jobInfo = sql """select Status from jobs("type"="insert") where Name='${jobName}'"""
log.info("jobInfo after incremental insert: " + jobInfo)
assert jobInfo.get(0).get(0) == 'FINISHED'

sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
}
}
Loading
Loading