From b05c7718fff112a21cde9ec76d7970a603371f63 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 3 Dec 2025 11:28:07 +0800 Subject: [PATCH 1/5] fix --- .../PipeInsertNodeTabletInsertionEvent.java | 116 ++++++++++++++---- 1 file changed, 91 insertions(+), 25 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 63cae75fbbd13..87e230cc4eb4d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -147,7 +147,7 @@ private PipeInsertNodeTabletInsertionEvent( isTableModelEvent, databaseNameFromDataRegion); this.insertNode = insertNode; - this.progressIndex = insertNode.getProgressIndex(); + this.progressIndex = Objects.nonNull(insertNode) ? insertNode.getProgressIndex() : null; this.allocatedMemoryBlock = new AtomicReference<>(); } @@ -157,16 +157,26 @@ public InsertNode getInsertNode() { } public ByteBuffer getByteBuffer() throws WALPipeException { - return insertNode.serializeToByteBuffer(); + final InsertNode node = insertNode; + if (Objects.isNull(node)) { + if (!isReleased.get()) { + LOGGER.warn( + "InsertNode is null but event is not released yet. Event: {}", coreReportMessage()); + throw new PipeException( + "InsertNode is null but event is not released, this should not happen"); + } + throw new PipeException("InsertNode has been released"); + } + return node.serializeToByteBuffer(); } public String getDeviceId() { - if (Objects.isNull(insertNode)) { + final InsertNode node = insertNode; + if (Objects.isNull(node)) { return null; } - return Objects.nonNull(insertNode.getTargetPath()) - ? insertNode.getTargetPath().getFullPath() - : null; + final PartialPath targetPath = node.getTargetPath(); + return Objects.nonNull(targetPath) ? targetPath.getFullPath() : null; } public long getExtractTime() { @@ -244,10 +254,21 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP final boolean skipIfNoPrivileges, final long startTime, final long endTime) { + final InsertNode node = insertNode; + if (Objects.isNull(node)) { + if (!isReleased.get()) { + LOGGER.warn( + "InsertNode is null but event is not released yet when creating shallow copy. Event: {}", + coreReportMessage()); + throw new PipeException( + "InsertNode is null but event is not released, this should not happen"); + } + throw new PipeException("InsertNode has been released"); + } return new PipeInsertNodeTabletInsertionEvent( getRawIsTableModelEvent(), getSourceDatabaseNameFromDataRegion(), - insertNode, + node, pipeName, creationTime, pipeTaskMeta, @@ -263,7 +284,17 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP @Override public boolean isGeneratedByPipe() { - return insertNode.isGeneratedByPipe(); + final InsertNode node = insertNode; + if (Objects.isNull(node)) { + if (!isReleased.get()) { + LOGGER.warn( + "InsertNode is null but event is not released yet. Event: {}", coreReportMessage()); + throw new PipeException( + "InsertNode is null but event is not released, this should not happen"); + } + return false; + } + return node.isGeneratedByPipe(); } @Override @@ -271,17 +302,28 @@ public void throwIfNoPrivilege() { if (skipIfNoPrivileges || !isTableModelEvent()) { return; } - if (Objects.nonNull(insertNode.getTargetPath())) { - checkTableName( - DeviceIDFactory.getInstance().getDeviceID(insertNode.getTargetPath()).getTableName()); - } else if (insertNode instanceof InsertRowsNode) { + final InsertNode node = insertNode; + if (Objects.isNull(node)) { + if (!isReleased.get()) { + LOGGER.warn( + "InsertNode is null but event is not released yet. Event: {}", coreReportMessage()); + throw new PipeException( + "InsertNode is null but event is not released, this should not happen"); + } + // Event is released, skip privilege check + return; + } + final PartialPath targetPath = node.getTargetPath(); + if (Objects.nonNull(targetPath)) { + checkTableName(DeviceIDFactory.getInstance().getDeviceID(targetPath).getTableName()); + } else if (node instanceof InsertRowsNode) { for (final String tableName : - ((InsertRowsNode) insertNode) + ((InsertRowsNode) node) .getInsertRowNodeList().stream() .map( - node -> + insertRowNode -> DeviceIDFactory.getInstance() - .getDeviceID(node.getTargetPath()) + .getDeviceID(insertRowNode.getTargetPath()) .getTableName()) .collect(Collectors.toSet())) { checkTableName(tableName); @@ -307,6 +349,11 @@ public boolean mayEventTimeOverlappedWithTimeRange() { try { final InsertNode insertNode = getInsertNode(); if (Objects.isNull(insertNode)) { + if (!isReleased.get()) { + LOGGER.warn( + "InsertNode is null but event is not released yet when checking time overlap. Event: {}", + coreReportMessage()); + } return true; } @@ -351,6 +398,11 @@ public boolean mayEventPathsOverlappedWithPattern() { try { final InsertNode insertNode = getInsertNode(); if (Objects.isNull(insertNode)) { + if (!isReleased.get()) { + LOGGER.warn( + "InsertNode is null but event is not released yet when checking path overlap. Event: {}", + coreReportMessage()); + } return true; } @@ -443,6 +495,15 @@ private List initEventParsers() { eventParsers = new ArrayList<>(); final InsertNode node = getInsertNode(); + if (Objects.isNull(node)) { + if (!isReleased.get()) { + LOGGER.warn( + "InsertNode is null but event is not released yet. Event: {}", coreReportMessage()); + throw new PipeException( + "InsertNode is null but event is not released, this should not happen"); + } + throw new PipeException("InsertNode has been released"); + } switch (node.getType()) { case INSERT_ROW: case INSERT_TABLET: @@ -526,11 +587,12 @@ public List toRawTabletInsertionEvents() { @Override public String toString() { + final InsertNode node = insertNode; return String.format( "PipeInsertNodeTabletInsertionEvent{progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s, eventParsers=%s}", progressIndex, - Objects.nonNull(insertNode) ? insertNode.isAligned() : null, - Objects.nonNull(insertNode) ? insertNode.isGeneratedByPipe() : null, + Objects.nonNull(node) ? node.isAligned() : null, + Objects.nonNull(node) ? node.isGeneratedByPipe() : null, eventParsers) + " - " + super.toString(); @@ -538,11 +600,12 @@ public String toString() { @Override public String coreReportMessage() { + final InsertNode node = insertNode; return String.format( "PipeInsertNodeTabletInsertionEvent{progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s}", progressIndex, - Objects.nonNull(insertNode) ? insertNode.isAligned() : null, - Objects.nonNull(insertNode) ? insertNode.isGeneratedByPipe() : null) + Objects.nonNull(node) ? node.isAligned() : null, + Objects.nonNull(node) ? node.isGeneratedByPipe() : null) + " - " + super.coreReportMessage(); } @@ -567,12 +630,15 @@ public PipeEventResource eventResourceBuilder() { // invoked, the event will soon be released. @Override public long ramBytesUsed() { - return bytes > 0 - ? bytes - : (bytes = - INSTANCE_SIZE - + (Objects.nonNull(insertNode) ? InsertNodeMemoryEstimator.sizeOf(insertNode) : 0) - + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0)); + if (bytes > 0) { + return bytes; + } + final InsertNode node = insertNode; + bytes = + INSTANCE_SIZE + + (Objects.nonNull(node) ? InsertNodeMemoryEstimator.sizeOf(node) : 0) + + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0); + return bytes; } private static class PipeInsertNodeTabletInsertionEventResource extends PipeEventResource { From d2028b703c6cabbc8af0de32d15051cd29551906 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 3 Dec 2025 11:52:07 +0800 Subject: [PATCH 2/5] fix --- .../task/subtask/sink/PipeSinkSubtask.java | 5 ++- .../PipeInsertNodeTabletInsertionEvent.java | 35 ------------------- 2 files changed, 2 insertions(+), 38 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 8a86db3ded5da..3b8e7b21289ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -143,8 +143,7 @@ protected boolean executeOnce() { } else { LOGGER.info( "{} in pipe transfer, ignored because the connector subtask is dropped.", - e.getClass().getSimpleName(), - e); + e.getClass().getSimpleName()); clearReferenceCountAndReleaseLastEvent(event); } } catch (final Exception e) { @@ -161,7 +160,7 @@ protected boolean executeOnce() { e); } else { LOGGER.info( - "Exception in pipe transfer, ignored because the connector subtask is dropped.", e); + "Exception in pipe transfer, ignored because the connector subtask is dropped."); clearReferenceCountAndReleaseLastEvent(event); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 87e230cc4eb4d..4fc06698f2071 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -159,12 +159,6 @@ public InsertNode getInsertNode() { public ByteBuffer getByteBuffer() throws WALPipeException { final InsertNode node = insertNode; if (Objects.isNull(node)) { - if (!isReleased.get()) { - LOGGER.warn( - "InsertNode is null but event is not released yet. Event: {}", coreReportMessage()); - throw new PipeException( - "InsertNode is null but event is not released, this should not happen"); - } throw new PipeException("InsertNode has been released"); } return node.serializeToByteBuffer(); @@ -256,13 +250,6 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP final long endTime) { final InsertNode node = insertNode; if (Objects.isNull(node)) { - if (!isReleased.get()) { - LOGGER.warn( - "InsertNode is null but event is not released yet when creating shallow copy. Event: {}", - coreReportMessage()); - throw new PipeException( - "InsertNode is null but event is not released, this should not happen"); - } throw new PipeException("InsertNode has been released"); } return new PipeInsertNodeTabletInsertionEvent( @@ -304,12 +291,6 @@ public void throwIfNoPrivilege() { } final InsertNode node = insertNode; if (Objects.isNull(node)) { - if (!isReleased.get()) { - LOGGER.warn( - "InsertNode is null but event is not released yet. Event: {}", coreReportMessage()); - throw new PipeException( - "InsertNode is null but event is not released, this should not happen"); - } // Event is released, skip privilege check return; } @@ -349,11 +330,6 @@ public boolean mayEventTimeOverlappedWithTimeRange() { try { final InsertNode insertNode = getInsertNode(); if (Objects.isNull(insertNode)) { - if (!isReleased.get()) { - LOGGER.warn( - "InsertNode is null but event is not released yet when checking time overlap. Event: {}", - coreReportMessage()); - } return true; } @@ -398,11 +374,6 @@ public boolean mayEventPathsOverlappedWithPattern() { try { final InsertNode insertNode = getInsertNode(); if (Objects.isNull(insertNode)) { - if (!isReleased.get()) { - LOGGER.warn( - "InsertNode is null but event is not released yet when checking path overlap. Event: {}", - coreReportMessage()); - } return true; } @@ -496,12 +467,6 @@ private List initEventParsers() { eventParsers = new ArrayList<>(); final InsertNode node = getInsertNode(); if (Objects.isNull(node)) { - if (!isReleased.get()) { - LOGGER.warn( - "InsertNode is null but event is not released yet. Event: {}", coreReportMessage()); - throw new PipeException( - "InsertNode is null but event is not released, this should not happen"); - } throw new PipeException("InsertNode has been released"); } switch (node.getType()) { From ab487b8a153fcfa819a7c6eee67e3c17e031a5dd Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 3 Dec 2025 11:58:32 +0800 Subject: [PATCH 3/5] fix --- .../common/tablet/PipeInsertNodeTabletInsertionEvent.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 4fc06698f2071..d044bbea0f2ed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -273,13 +273,7 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP public boolean isGeneratedByPipe() { final InsertNode node = insertNode; if (Objects.isNull(node)) { - if (!isReleased.get()) { - LOGGER.warn( - "InsertNode is null but event is not released yet. Event: {}", coreReportMessage()); - throw new PipeException( - "InsertNode is null but event is not released, this should not happen"); - } - return false; + throw new PipeException("InsertNode has been released"); } return node.isGeneratedByPipe(); } From 9ccbccb9da88b49bd5852d6eed8360d5ad0a8830 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 3 Dec 2025 12:28:03 +0800 Subject: [PATCH 4/5] fix --- .../event/common/tablet/PipeInsertNodeTabletInsertionEvent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index d044bbea0f2ed..f5ae873bbac31 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -147,7 +147,7 @@ private PipeInsertNodeTabletInsertionEvent( isTableModelEvent, databaseNameFromDataRegion); this.insertNode = insertNode; - this.progressIndex = Objects.nonNull(insertNode) ? insertNode.getProgressIndex() : null; + this.progressIndex = insertNode.getProgressIndex(); this.allocatedMemoryBlock = new AtomicReference<>(); } From 570abc92621d5f48220da882b9be35a260e444da Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 3 Dec 2025 15:56:29 +0800 Subject: [PATCH 5/5] fix --- .../db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 3b8e7b21289ac..f5bafc5d85782 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -142,8 +142,9 @@ protected boolean executeOnce() { throw e; } else { LOGGER.info( - "{} in pipe transfer, ignored because the connector subtask is dropped.", - e.getClass().getSimpleName()); + "{} in pipe transfer, ignored because the connector subtask is dropped.{}", + e.getClass().getSimpleName(), + e.getMessage() != null ? " Message: " + e.getMessage() : ""); clearReferenceCountAndReleaseLastEvent(event); } } catch (final Exception e) { @@ -160,7 +161,8 @@ protected boolean executeOnce() { e); } else { LOGGER.info( - "Exception in pipe transfer, ignored because the connector subtask is dropped."); + "Exception in pipe transfer, ignored because the connector subtask is dropped.{}", + e.getMessage() != null ? " Message: " + e.getMessage() : ""); clearReferenceCountAndReleaseLastEvent(event); } }