Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;

import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
Expand Down Expand Up @@ -115,7 +116,23 @@ public synchronized void close() {
events.clear();
}

public void decreaseEventsReferenceCount(final String holderMessage, final boolean shouldReport) {
/**
* Discard all events of the given pipe. This method only clears the reference count of the events
* and discard them, but do not modify other objects (such as buffers) for simplicity.
*/
public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
events.removeIf(
event -> {
if (pipeNameToDrop.equals(event.getPipeName())) {
event.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
return true;
}
return false;
});
}

public synchronized void decreaseEventsReferenceCount(
final String holderMessage, final boolean shouldReport) {
events.forEach(event -> event.decreaseReferenceCount(holderMessage, shouldReport));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ public boolean isEmpty() {
&& endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
}

public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
defaultBatch.discardEventsOfPipe(pipeNameToDrop);
endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop));
}

@Override
public synchronized void close() {
defaultBatch.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,11 +515,9 @@ public synchronized void clearRetryEventsReferenceCount() {

//////////////////////////// Operations for close ////////////////////////////

/**
* When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard
* its queued events in the output pipe connector.
*/
@Override
public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
retryEventQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ private void doTransfer(
LOGGER.info("Successfully transferred file {}.", tsFile);
}

@Override
public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
}

@Override
public void close() {
if (tabletBatchBuilder != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.task.subtask.PipeAbstractConnectorSubtask;
Expand Down Expand Up @@ -268,8 +269,8 @@ public void discardEventsOfPipe(final String pipeNameToDrop) {
}
}

if (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector) {
((IoTDBDataRegionAsyncConnector) outputPipeConnector).discardEventsOfPipe(pipeNameToDrop);
if (outputPipeConnector instanceof IoTDBConnector) {
((IoTDBConnector) outputPipeConnector).discardEventsOfPipe(pipeNameToDrop);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public synchronized void register() {
* the {@link PipeConnectorSubtask} should never be used again
* @throws IllegalStateException if {@link PipeConnectorSubtaskLifeCycle#registeredTaskCount} <= 0
*/
public synchronized boolean deregister(String pipeNameToDeregister) {
public synchronized boolean deregister(final String pipeNameToDeregister) {
if (registeredTaskCount <= 0) {
throw new IllegalStateException("registeredTaskCount <= 0");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,14 @@ public void rateLimitIfNeeded(
GLOBAL_RATE_LIMITER.acquire(bytesLength);
}

/**
* When a pipe is dropped, the connector maybe reused and will not be closed. We need to discard
* its batched or queued events in the output pipe connector.
*/
public synchronized void discardEventsOfPipe(final String pipeName) {
// Do nothing by default
}

public PipeReceiverStatusHandler statusHandler() {
return receiverStatusHandler;
}
Expand Down