diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 8a86db3ded5d..f5bafc5d8578 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -142,9 +142,9 @@ protected boolean executeOnce() { throw e; } else { LOGGER.info( - "{} in pipe transfer, ignored because the connector subtask is dropped.", + "{} in pipe transfer, ignored because the connector subtask is dropped.{}", e.getClass().getSimpleName(), - e); + e.getMessage() != null ? " Message: " + e.getMessage() : ""); clearReferenceCountAndReleaseLastEvent(event); } } catch (final Exception e) { @@ -161,7 +161,8 @@ protected boolean executeOnce() { e); } else { LOGGER.info( - "Exception in pipe transfer, ignored because the connector subtask is dropped.", e); + "Exception in pipe transfer, ignored because the connector subtask is dropped.{}", + e.getMessage() != null ? " Message: " + e.getMessage() : ""); clearReferenceCountAndReleaseLastEvent(event); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 63cae75fbbd1..f5ae873bbac3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -157,16 +157,20 @@ public InsertNode getInsertNode() { } public ByteBuffer getByteBuffer() throws WALPipeException { - return insertNode.serializeToByteBuffer(); + final InsertNode node = insertNode; + if (Objects.isNull(node)) { + throw new PipeException("InsertNode has been released"); + } + return node.serializeToByteBuffer(); } public String getDeviceId() { - if (Objects.isNull(insertNode)) { + final InsertNode node = insertNode; + if (Objects.isNull(node)) { return null; } - return Objects.nonNull(insertNode.getTargetPath()) - ? insertNode.getTargetPath().getFullPath() - : null; + final PartialPath targetPath = node.getTargetPath(); + return Objects.nonNull(targetPath) ? targetPath.getFullPath() : null; } public long getExtractTime() { @@ -244,10 +248,14 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP final boolean skipIfNoPrivileges, final long startTime, final long endTime) { + final InsertNode node = insertNode; + if (Objects.isNull(node)) { + throw new PipeException("InsertNode has been released"); + } return new PipeInsertNodeTabletInsertionEvent( getRawIsTableModelEvent(), getSourceDatabaseNameFromDataRegion(), - insertNode, + node, pipeName, creationTime, pipeTaskMeta, @@ -263,7 +271,11 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP @Override public boolean isGeneratedByPipe() { - return insertNode.isGeneratedByPipe(); + final InsertNode node = insertNode; + if (Objects.isNull(node)) { + throw new PipeException("InsertNode has been released"); + } + return node.isGeneratedByPipe(); } @Override @@ -271,17 +283,22 @@ public void throwIfNoPrivilege() { if (skipIfNoPrivileges || !isTableModelEvent()) { return; } - if (Objects.nonNull(insertNode.getTargetPath())) { - checkTableName( - DeviceIDFactory.getInstance().getDeviceID(insertNode.getTargetPath()).getTableName()); - } else if (insertNode instanceof InsertRowsNode) { + final InsertNode node = insertNode; + if (Objects.isNull(node)) { + // Event is released, skip privilege check + return; + } + final PartialPath targetPath = node.getTargetPath(); + if (Objects.nonNull(targetPath)) { + checkTableName(DeviceIDFactory.getInstance().getDeviceID(targetPath).getTableName()); + } else if (node instanceof InsertRowsNode) { for (final String tableName : - ((InsertRowsNode) insertNode) + ((InsertRowsNode) node) .getInsertRowNodeList().stream() .map( - node -> + insertRowNode -> DeviceIDFactory.getInstance() - .getDeviceID(node.getTargetPath()) + .getDeviceID(insertRowNode.getTargetPath()) .getTableName()) .collect(Collectors.toSet())) { checkTableName(tableName); @@ -443,6 +460,9 @@ private List initEventParsers() { eventParsers = new ArrayList<>(); final InsertNode node = getInsertNode(); + if (Objects.isNull(node)) { + throw new PipeException("InsertNode has been released"); + } switch (node.getType()) { case INSERT_ROW: case INSERT_TABLET: @@ -526,11 +546,12 @@ public List toRawTabletInsertionEvents() { @Override public String toString() { + final InsertNode node = insertNode; return String.format( "PipeInsertNodeTabletInsertionEvent{progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s, eventParsers=%s}", progressIndex, - Objects.nonNull(insertNode) ? insertNode.isAligned() : null, - Objects.nonNull(insertNode) ? insertNode.isGeneratedByPipe() : null, + Objects.nonNull(node) ? node.isAligned() : null, + Objects.nonNull(node) ? node.isGeneratedByPipe() : null, eventParsers) + " - " + super.toString(); @@ -538,11 +559,12 @@ public String toString() { @Override public String coreReportMessage() { + final InsertNode node = insertNode; return String.format( "PipeInsertNodeTabletInsertionEvent{progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s}", progressIndex, - Objects.nonNull(insertNode) ? insertNode.isAligned() : null, - Objects.nonNull(insertNode) ? insertNode.isGeneratedByPipe() : null) + Objects.nonNull(node) ? node.isAligned() : null, + Objects.nonNull(node) ? node.isGeneratedByPipe() : null) + " - " + super.coreReportMessage(); } @@ -567,12 +589,15 @@ public PipeEventResource eventResourceBuilder() { // invoked, the event will soon be released. @Override public long ramBytesUsed() { - return bytes > 0 - ? bytes - : (bytes = - INSTANCE_SIZE - + (Objects.nonNull(insertNode) ? InsertNodeMemoryEstimator.sizeOf(insertNode) : 0) - + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0)); + if (bytes > 0) { + return bytes; + } + final InsertNode node = insertNode; + bytes = + INSTANCE_SIZE + + (Objects.nonNull(node) ? InsertNodeMemoryEstimator.sizeOf(node) : 0) + + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0); + return bytes; } private static class PipeInsertNodeTabletInsertionEventResource extends PipeEventResource {