diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java index 8e2e5eff9dffd5..395cc47f20ed00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java @@ -34,6 +34,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PRequestCdcClientResult; import org.apache.doris.rpc.BackendServiceProxy; @@ -75,6 +76,9 @@ * chunkHighWatermarkMap is always updated unconditionally to support recovery *
  • replayIfNeed: checks currentOffset directly — snapshot triggers remainingSplits rebuild * from meta + chunkHighWatermarkMap; binlog needs no action (currentOffset already set)
  • + *
  • image persistence: chw/bop/ts also flow through getPersistInfo (inherited) so state + * survives FE checkpoint after pre-checkpoint journal is GC'd; restoreFromPersistInfo + * reads them back on startup
  • * */ @Log4j2 @@ -113,10 +117,7 @@ public void ensureInitialized(Long jobId, Map originTvfProps) th if (this.jobId != null) { return; } - // One-time initialization below — safe to skip on FE restart because the provider - // is reconstructed fresh (getPersistInfo returns null), so jobId is null then too. this.jobId = jobId; - this.chunkHighWatermarkMap = new HashMap<>(); this.sourceType = resolvedType; String table = originTvfProps.get(DataSourceConfigKeys.TABLE); Preconditions.checkArgument(table != null, "table is required for cdc_stream TVF"); @@ -265,7 +266,8 @@ private List> buildCumulativeSnapshotOffset( * adds it to finishedSplits. During txn replay remainingSplits is empty so removeIf returns * false naturally — chunkHighWatermarkMap is still updated for replayIfNeed to use later. * - *

    Binlog: currentOffset is set above; no extra state needed for TVF recovery path. + *

    Binlog: currentOffset is set above. Also mirror startingOffset into binlogOffsetPersist + * so it survives FE checkpoint via image (currentOffset has no @SerializedName). */ @Override public void updateOffset(Offset offset) { @@ -289,13 +291,19 @@ public void updateOffset(Offset offset) { chunkHighWatermarkMap.computeIfAbsent(buildTableKey(), k -> new HashMap<>()) .put(ss.getSplitId(), ss.getHighWatermark()); } + } else { + // Mirror binlog offset into bop so it survives FE checkpoint + BinlogSplit bs = (BinlogSplit) currentOffset.getSplits().get(0); + if (MapUtils.isNotEmpty(bs.getStartingOffset())) { + binlogOffsetPersist = new HashMap<>(bs.getStartingOffset()); + binlogOffsetPersist.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); + } } - // Binlog: currentOffset is already set; no binlogOffsetPersist needed for TVF path. } /** - * TVF path recovery: offsetProviderPersist is always null (no EditLog write). - * currentOffset is set by replayOnCommitted/replayOnCloudMode -> updateOffset before this runs. + * TVF path recovery. After FE checkpoint the pre-checkpoint journal is GC'd, so currentOffset + * (no @SerializedName) may be null with prior commits — fall back to bop/chw restored from image. * *