From c3c6916204254233e16b119f6b31816daa68b4a3 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 25 Mar 2026 19:34:22 +0800 Subject: [PATCH 1/2] fix-data --- .../tablet/PipeRawTabletInsertionEvent.java | 33 +++++++++++++------ .../progress/interval/PipeCommitInterval.java | 3 +- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index c4f900b7d5f9c..7e85106af346e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -116,12 +116,14 @@ private PipeRawTabletInsertionEvent( this.allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); - addOnCommittedHook( - () -> { - if (shouldReportOnCommit) { - eliminateProgressIndex(); - } - }); + if (needToReport) { + addOnCommittedHook( + () -> { + if (shouldReportOnCommit) { + eliminateProgressIndex(); + } + }); + } } public PipeRawTabletInsertionEvent( @@ -303,10 +305,8 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa } protected void eliminateProgressIndex() { - if (needToReport) { - if (sourceEvent instanceof PipeTsFileInsertionEvent) { - ((PipeTsFileInsertionEvent) sourceEvent).eliminateProgressIndex(); - } + if (sourceEvent instanceof PipeTsFileInsertionEvent) { + ((PipeTsFileInsertionEvent) sourceEvent).eliminateProgressIndex(); } } @@ -387,6 +387,14 @@ public boolean mayEventPathsOverlappedWithPattern() { } public void markAsNeedToReport() { + if (!needToReport) { + addOnCommittedHook( + () -> { + if (shouldReportOnCommit) { + eliminateProgressIndex(); + } + }); + } this.needToReport = true; } @@ -404,6 +412,11 @@ public EnrichedEvent getSourceEvent() { return sourceEvent; } + @Override + public boolean isShouldReportOnCommit() { + return shouldReportOnCommit && needToReport; + } + /////////////////////////// TabletInsertionEvent /////////////////////////// @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java index 885df4727dac7..456acd646dccf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.interval.Interval; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -43,7 +44,7 @@ public PipeCommitInterval( this.pipeTaskMeta = pipeTaskMeta; this.currentIndex = Objects.nonNull(currentIndex) ? currentIndex : MinimumProgressIndex.INSTANCE; - this.onCommittedHooks = onCommittedHooks; + this.onCommittedHooks = new ArrayList<>(onCommittedHooks); } @Override From b2f35286aec48fe5c1f7d82450e477e971decb02 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 25 Mar 2026 19:58:09 +0800 Subject: [PATCH 2/2] fix --- .../event/common/tsfile/PipeTsFileInsertionEvent.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 316b728a278a9..1505e15996fe6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -561,6 +561,17 @@ public boolean mayEventTimeOverlappedWithTimeRange() { || startTime <= resource.getFileEndTime() && resource.getFileStartTime() <= endTime; } + @Override + public boolean shouldParseTime() { + if (!isTimeParsed + && Objects.nonNull(resource) + && startTime <= resource.getFileStartTime() + && resource.getFileEndTime() <= endTime) { + isTimeParsed = true; + } + return !isTimeParsed; + } + @Override public boolean mayEventPathsOverlappedWithPattern() { if (Objects.isNull(resource) || !resource.isClosed() || isTableModelEvent()) {