From 230034876f910199df8509ce9568cc01b0e98947 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 25 Mar 2026 19:34:22 +0800 Subject: [PATCH 1/5] new --- .../tablet/PipeRawTabletInsertionEvent.java | 25 ++++++++----------- .../progress/interval/PipeCommitInterval.java | 3 ++- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 1377c4153195f..b72815adf9ae2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -85,14 +85,18 @@ private PipeRawTabletInsertionEvent( this.allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); - addOnCommittedHook( - () -> { - if (shouldReportOnCommit) { - eliminateProgressIndex(); - } - }); + triggerAddHook(); } + private void triggerAddHook() { + if (shouldReportOnCommit && needToReport && sourceEvent instanceof PipeTsFileInsertionEvent) { + final PipeTsFileInsertionEvent event = ((PipeTsFileInsertionEvent) sourceEvent); + addOnCommittedHook(event::eliminateProgressIndex); + } + } + + + public PipeRawTabletInsertionEvent( final Tablet tablet, final boolean isAligned, @@ -181,14 +185,6 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa return true; } - protected void eliminateProgressIndex() { - if (needToReport) { - if (sourceEvent instanceof PipeTsFileInsertionEvent) { - ((PipeTsFileInsertionEvent) sourceEvent).eliminateProgressIndex(); - } - } - } - @Override public void bindProgressIndex(final ProgressIndex overridingProgressIndex) { // Normally not all events need to report progress, but if the overridingProgressIndex @@ -254,6 +250,7 @@ public boolean mayEventPathsOverlappedWithPattern() { public void markAsNeedToReport() { this.needToReport = true; + triggerAddHook(); } // This getter is reserved for user-defined plugins diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java index 885df4727dac7..456acd646dccf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.interval.Interval; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -43,7 +44,7 @@ public PipeCommitInterval( this.pipeTaskMeta = pipeTaskMeta; this.currentIndex = Objects.nonNull(currentIndex) ? currentIndex : MinimumProgressIndex.INSTANCE; - this.onCommittedHooks = onCommittedHooks; + this.onCommittedHooks = new ArrayList<>(onCommittedHooks); } @Override From 95abd655245326d56f0b4985d0aac64e9099039d Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 25 Mar 2026 19:34:22 +0800 Subject: [PATCH 2/5] fix-data --- .../tablet/PipeRawTabletInsertionEvent.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index b72815adf9ae2..e6e6ef76a15cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -185,6 +185,12 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa return true; } + protected void eliminateProgressIndex() { + if (sourceEvent instanceof PipeTsFileInsertionEvent) { + ((PipeTsFileInsertionEvent) sourceEvent).eliminateProgressIndex(); + } + } + @Override public void bindProgressIndex(final ProgressIndex overridingProgressIndex) { // Normally not all events need to report progress, but if the overridingProgressIndex @@ -249,6 +255,14 @@ public boolean mayEventPathsOverlappedWithPattern() { } public void markAsNeedToReport() { + if (!needToReport) { + addOnCommittedHook( + () -> { + if (shouldReportOnCommit) { + eliminateProgressIndex(); + } + }); + } this.needToReport = true; triggerAddHook(); } @@ -267,6 +281,11 @@ public EnrichedEvent getSourceEvent() { return sourceEvent; } + @Override + public boolean isShouldReportOnCommit() { + return shouldReportOnCommit && needToReport; + } + /////////////////////////// TabletInsertionEvent /////////////////////////// @Override From 5c1369c992882ae3e5085210d666d9e7c41dbe93 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 25 Mar 2026 19:58:09 +0800 Subject: [PATCH 3/5] fix --- .../event/common/tsfile/PipeTsFileInsertionEvent.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 858c425c3e6cc..f099301a63cdd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -375,6 +375,17 @@ public boolean mayEventTimeOverlappedWithTimeRange() { || startTime <= resource.getFileEndTime() && resource.getFileStartTime() <= endTime; } + @Override + public boolean shouldParseTime() { + if (!isTimeParsed + && Objects.nonNull(resource) + && startTime <= resource.getFileStartTime() + && resource.getFileEndTime() <= endTime) { + isTimeParsed = true; + } + return !isTimeParsed; + } + @Override public boolean mayEventPathsOverlappedWithPattern() { if (Objects.isNull(resource) || !resource.isClosed()) { From 2723de18d51f5058b6ec6171fed46c1651052941 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 25 Mar 2026 20:04:42 +0800 Subject: [PATCH 4/5] revert --- .../common/tablet/PipeRawTabletInsertionEvent.java | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index e6e6ef76a15cb..27ea4cb7c6c95 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -84,19 +84,8 @@ private PipeRawTabletInsertionEvent( // Allocate empty memory block, will be resized later. this.allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); - - triggerAddHook(); - } - - private void triggerAddHook() { - if (shouldReportOnCommit && needToReport && sourceEvent instanceof PipeTsFileInsertionEvent) { - final PipeTsFileInsertionEvent event = ((PipeTsFileInsertionEvent) sourceEvent); - addOnCommittedHook(event::eliminateProgressIndex); - } } - - public PipeRawTabletInsertionEvent( final Tablet tablet, final boolean isAligned, @@ -264,7 +253,6 @@ public void markAsNeedToReport() { }); } this.needToReport = true; - triggerAddHook(); } // This getter is reserved for user-defined plugins From 8c7be041866a4fd8c83117804ecda163776b7d29 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 25 Mar 2026 20:06:12 +0800 Subject: [PATCH 5/5] fix --- .../event/common/tablet/PipeRawTabletInsertionEvent.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 27ea4cb7c6c95..d322291934ffe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -84,6 +84,15 @@ private PipeRawTabletInsertionEvent( // Allocate empty memory block, will be resized later. this.allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); + + if (needToReport) { + addOnCommittedHook( + () -> { + if (shouldReportOnCommit) { + eliminateProgressIndex(); + } + }); + } } public PipeRawTabletInsertionEvent(