Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
import org.apache.doris.rpc.BackendServiceProxy;
Expand Down Expand Up @@ -75,6 +76,9 @@
* chunkHighWatermarkMap is always updated unconditionally to support recovery</li>
* <li>replayIfNeed: checks currentOffset directly — snapshot triggers remainingSplits rebuild
* from meta + chunkHighWatermarkMap; binlog needs no action (currentOffset already set)</li>
* <li>image persistence: chw/bop/ts also flow through getPersistInfo (inherited) so state
* survives FE checkpoint after pre-checkpoint journal is GC'd; restoreFromPersistInfo
* reads them back on startup</li>
* </ul>
*/
@Log4j2
Expand Down Expand Up @@ -113,10 +117,7 @@ public void ensureInitialized(Long jobId, Map<String, String> originTvfProps) th
if (this.jobId != null) {
return;
}
// One-time initialization below — safe to skip on FE restart because the provider
// is reconstructed fresh (getPersistInfo returns null), so jobId is null then too.
this.jobId = jobId;
this.chunkHighWatermarkMap = new HashMap<>();
this.sourceType = resolvedType;
String table = originTvfProps.get(DataSourceConfigKeys.TABLE);
Preconditions.checkArgument(table != null, "table is required for cdc_stream TVF");
Expand Down Expand Up @@ -265,7 +266,8 @@ private List<Map<String, String>> buildCumulativeSnapshotOffset(
* adds it to finishedSplits. During txn replay remainingSplits is empty so removeIf returns
* false naturally — chunkHighWatermarkMap is still updated for replayIfNeed to use later.
*
* <p>Binlog: currentOffset is set above; no extra state needed for TVF recovery path.
* <p>Binlog: currentOffset is set above. Also mirror startingOffset into binlogOffsetPersist
* so it survives FE checkpoint via image (currentOffset has no @SerializedName).
*/
@Override
public void updateOffset(Offset offset) {
Expand All @@ -289,13 +291,19 @@ public void updateOffset(Offset offset) {
chunkHighWatermarkMap.computeIfAbsent(buildTableKey(), k -> new HashMap<>())
.put(ss.getSplitId(), ss.getHighWatermark());
}
} else {
// Mirror binlog offset into bop so it survives FE checkpoint
BinlogSplit bs = (BinlogSplit) currentOffset.getSplits().get(0);
if (MapUtils.isNotEmpty(bs.getStartingOffset())) {
binlogOffsetPersist = new HashMap<>(bs.getStartingOffset());
binlogOffsetPersist.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
}
}
// Binlog: currentOffset is already set; no binlogOffsetPersist needed for TVF path.
}

/**
* TVF path recovery: offsetProviderPersist is always null (no EditLog write).
* currentOffset is set by replayOnCommitted/replayOnCloudMode -> updateOffset before this runs.
* TVF path recovery. After FE checkpoint the pre-checkpoint journal is GC'd, so currentOffset
* (no @SerializedName) may be null with prior commits — fall back to bop/chw restored from image.
*
* <ul>
* <li>snapshot: mid-snapshot restart — rebuild remainingSplits from meta + chunkHighWatermarkMap</li>
Expand All @@ -305,14 +313,28 @@ public void updateOffset(Offset offset) {
@Override
public void replayIfNeed(StreamingInsertJob job) throws JobException {
if (currentOffset == null) {
// No committed txn yet. If snapshot splits exist in the meta table (written by
// initOnCreate), restore remainingSplits so getNextOffset() returns snapshot splits
// instead of a BinlogSplit (which would incorrectly skip the snapshot phase).
// Post-checkpoint binlog: rebuild from bop persisted in image
if (MapUtils.isNotEmpty(binlogOffsetPersist)) {
currentOffset = new JdbcOffset(
Collections.singletonList(new BinlogSplit(binlogOffsetPersist)));
log.info("Replaying TVF offset provider for job {}: restored binlog offset from persist",
job.getJobId());
return;
}
// Fresh-create or post-checkpoint mid-snapshot: restore remainingSplits from meta
// so getNextOffset() returns snapshot splits instead of an empty BinlogSplit.
Map<String, List<SnapshotSplit>> snapshotSplits = StreamingJobUtils.restoreSplitsToJob(job.getJobId());
if (MapUtils.isNotEmpty(snapshotSplits)) {
recalculateRemainingSplits(new HashMap<>(), snapshotSplits);
log.info("Replaying TVF offset provider for job {}: no committed txn,"
+ " restored {} remaining splits from meta", job.getJobId(), remainingSplits.size());
// chw outer key may be "null.null" during journal replay (sourceProperties uninitialized); remap
Map<String, Map<String, Map<String, String>>> effective =
MapUtils.isNotEmpty(chunkHighWatermarkMap)
? remapChunkHighWatermarkMap(snapshotSplits)
: new HashMap<>();
recalculateRemainingSplits(effective, snapshotSplits);
log.info("Replaying TVF offset provider for job {}: no current offset,"
+ " restored {} remaining splits from meta (chw size={})",
job.getJobId(), remainingSplits.size(),
chunkHighWatermarkMap == null ? 0 : chunkHighWatermarkMap.size());
} else {
log.info("Replaying TVF offset provider for job {}: no committed txn,"
+ " no snapshot splits in meta", job.getJobId());
Expand Down Expand Up @@ -398,13 +420,26 @@ private Map<String, Map<String, Map<String, String>>> remapChunkHighWatermarkMap
}

/**
* TVF path does not persist to EditLog; state is recovered via txn replay.
* This override is defensive — the persistOffsetProviderIfNeed() call path
* only runs in the non-TVF commitOffset flow and won't reach here.
* Restore chw/bop/ts from the image-persisted JSON. Called by gsonPostProcess on FE startup
* before any journal replay; recovers state lost when pre-checkpoint journal is GC'd.
*/
@Override
public String getPersistInfo() {
return null;
public void restoreFromPersistInfo(String persistInfo) {
if (persistInfo == null) {
return;
}
try {
JdbcSourceOffsetProvider tmp = GsonUtils.GSON.fromJson(persistInfo, JdbcSourceOffsetProvider.class);
this.chunkHighWatermarkMap = tmp.getChunkHighWatermarkMap();
this.binlogOffsetPersist = tmp.getBinlogOffsetPersist();
this.tableSchemas = tmp.getTableSchemas();
log.info("Restored TVF offset provider from persist: chw={}, bop={}, ts.len={}",
chunkHighWatermarkMap == null ? 0 : chunkHighWatermarkMap.size(),
binlogOffsetPersist == null ? 0 : binlogOffsetPersist.size(),
tableSchemas == null ? 0 : tableSchemas.length());
} catch (Exception e) {
log.warn("Failed to restore TVF offset provider from persistInfo", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !final_data --
A1 1
B1 2
C1 3
D1 4
E1 5
F1 6
G1 7
H1 8
I1 9

Loading
Loading