From 060cfb48c4ed3b25eb24ea4919126fd036beadfc Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Sat, 13 Jul 2024 16:52:24 +0800 Subject: [PATCH 1/8] setup --- .../broker/SubscriptionPrefetchingQueue.java | 72 ++++++++----------- .../SubscriptionPrefetchingTabletQueue.java | 15 ++-- .../SubscriptionPrefetchingTsFileQueue.java | 2 +- ...FileDeduplicationBlockingPendingQueue.java | 29 +++++++- 4 files changed, 66 insertions(+), 52 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index 2d23950fe752f..320711b079d75 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -19,12 +19,10 @@ package org.apache.iotdb.db.subscription.broker; -import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; -import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent; @@ -93,7 +91,7 @@ public void cleanup() { public SubscriptionEvent poll(final String consumerId) { if (prefetchingQueue.isEmpty()) { - tryPrefetch(); + tryPrefetch(true); } final long size = prefetchingQueue.size(); @@ -144,51 +142,39 @@ public SubscriptionEvent poll(final String consumerId) { * SubscriptionPrefetchingQueue#inputPendingQueue} to {@link * SubscriptionPrefetchingQueue#prefetchingQueue} */ - protected void tryPrefetch() { + protected void tryPrefetch(final boolean trySealBatchIfEmpty) { Event event; - while (Objects.nonNull( - event = UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll()))) { - if (!(event instanceof EnrichedEvent)) { - LOGGER.warn( - "Subscription: SubscriptionPrefetchingQueue {} only support prefetch EnrichedEvent. Ignore {}.", - this, - event); - continue; - } - - if (event instanceof PipeTerminateEvent) { - LOGGER.info( - "Subscription: SubscriptionPrefetchingQueue {} commit PipeTerminateEvent {}", - this, - event); - // commit directly - ((PipeTerminateEvent) event) - .decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true); - continue; - } - if (event instanceof TabletInsertionEvent) { - if (onEvent((TabletInsertionEvent) event)) { - break; - } - } else if (event instanceof PipeTsFileInsertionEvent) { - if (onEvent((PipeTsFileInsertionEvent) event)) { - break; - } - } else { - // TODO: - // - PipeHeartbeatEvent: ignored? (may affect pipe metrics) - // - UserDefinedEnrichedEvent: ignored? - // - Others: events related to meta sync, safe to ignore - LOGGER.info( - "Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent {} when prefetching.", - this, - event); - if (trySealBatch()) { - break; + while (!inputPendingQueue.isEmpty()) { + if (Objects.nonNull( + event = UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll()))) { + if (event instanceof TabletInsertionEvent) { + if (onEvent((TabletInsertionEvent) event)) { + return; + } + } else if (event instanceof PipeTsFileInsertionEvent) { + if (onEvent((PipeTsFileInsertionEvent) event)) { + return; + } + } else { + // TODO: + // - PipeHeartbeatEvent: ignored? (may affect pipe metrics) + // - UserDefinedEnrichedEvent: ignored? + // - Others: events related to meta sync, safe to ignore + LOGGER.info( + "Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent {} when prefetching.", + this, + event); + if (trySealBatch()) { + return; + } } } } + + if (trySealBatchIfEmpty) { + trySealBatch(); + } } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java index 3aa36425b88f2..b0f69da2e764b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java @@ -83,7 +83,7 @@ public void cleanup() { @Override public void executePrefetch() { - super.tryPrefetch(); + super.tryPrefetch(false); this.serializeEventsInQueue(); } @@ -114,13 +114,18 @@ private boolean onEventInternal(final EnrichedEvent event) { @Override protected boolean trySealBatch() { + final AtomicBoolean result = new AtomicBoolean(false); currentBatchRef.getAndUpdate( (batch) -> { - sealBatch(batch); - return new SubscriptionPipeTabletEventBatch( - BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES); + if (batch.shouldEmit()) { + sealBatch(batch); + result.set(true); + return new SubscriptionPipeTabletEventBatch( + BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES); + } + return batch; }); - return true; + return result.get(); } private void sealBatch(final SubscriptionPipeTabletEventBatch batch) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java index b0aed50b8bf45..80b8da29fbc3a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java @@ -261,7 +261,7 @@ public SubscriptionEvent poll(final String consumerId) { @Override public synchronized void executePrefetch() { - super.tryPrefetch(); + super.tryPrefetch(false); // prefetch remaining subscription events based on {@link consumerIdToSubscriptionEventMap} for (final SubscriptionEvent event : consumerIdToSubscriptionEventMap.values()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java index d5741889b2a97..f301fef985b49 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java @@ -19,7 +19,9 @@ package org.apache.iotdb.db.subscription.broker; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.pipe.api.event.Event; @@ -49,22 +51,43 @@ public TsFileDeduplicationBlockingPendingQueue( } @Override - public synchronized Event waitedPoll() { // make it synchronized - final Event event = inputPendingQueue.waitedPoll(); + public Event waitedPoll() { + return filter(inputPendingQueue.waitedPoll()); + } + + private synchronized Event filter(final Event event) { // make it synchronized + if (Objects.isNull(event)) { + return null; + } + + if (!(event instanceof EnrichedEvent)) { + LOGGER.info("Subscription: Ignore {}", event); + return null; + } + + if (event instanceof PipeTerminateEvent) { + LOGGER.info("Subscription: Detect PipeTerminateEvent {}, commit it directly", event); + // commit directly + ((PipeTerminateEvent) event) + .decreaseReferenceCount(TsFileDeduplicationBlockingPendingQueue.class.getName(), true); + return null; + } + if (event instanceof PipeTsFileInsertionEvent) { final PipeTsFileInsertionEvent pipeTsFileInsertionEvent = (PipeTsFileInsertionEvent) event; final int hashcode = pipeTsFileInsertionEvent.getTsFile().hashCode(); if (Objects.nonNull(polledTsFiles.getIfPresent(hashcode))) { - // commit directly LOGGER.info( "Subscription: Detect duplicated PipeTsFileInsertionEvent {}, commit it directly", pipeTsFileInsertionEvent.coreReportMessage()); + // commit directly pipeTsFileInsertionEvent.decreaseReferenceCount( TsFileDeduplicationBlockingPendingQueue.class.getName(), true); return null; } polledTsFiles.put(hashcode, hashcode); } + return event; } } From 34a4be520e6d05bc6e7ebc12f8990a1ca9e3c0a2 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Mon, 15 Jul 2024 10:51:46 +0800 Subject: [PATCH 2/8] fixup --- .../broker/SubscriptionPrefetchingQueue.java | 74 +++++++++++++------ ...FileDeduplicationBlockingPendingQueue.java | 15 ---- 2 files changed, 52 insertions(+), 37 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index 320711b079d75..bf78c76e30dc2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -19,10 +19,12 @@ package org.apache.iotdb.db.subscription.broker; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; +import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent; @@ -146,29 +148,57 @@ protected void tryPrefetch(final boolean trySealBatchIfEmpty) { Event event; while (!inputPendingQueue.isEmpty()) { - if (Objects.nonNull( - event = UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll()))) { - if (event instanceof TabletInsertionEvent) { - if (onEvent((TabletInsertionEvent) event)) { - return; - } - } else if (event instanceof PipeTsFileInsertionEvent) { - if (onEvent((PipeTsFileInsertionEvent) event)) { - return; - } - } else { - // TODO: - // - PipeHeartbeatEvent: ignored? (may affect pipe metrics) - // - UserDefinedEnrichedEvent: ignored? - // - Others: events related to meta sync, safe to ignore - LOGGER.info( - "Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent {} when prefetching.", - this, - event); - if (trySealBatch()) { - return; - } + event = UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll()); + if (Objects.isNull(event)) { + // The event will be null in two cases: + // 1. The inputPendingQueue is empty. + // 2. The tsfile event has been deduplicated. + continue; + } + + if (!(event instanceof EnrichedEvent)) { + LOGGER.warn( + "Subscription: SubscriptionPrefetchingQueue {} only support prefetch EnrichedEvent. Ignore {}.", + this, + event); + continue; + } + + if (event instanceof PipeTerminateEvent) { + LOGGER.info( + "Subscription: SubscriptionPrefetchingQueue {} commit PipeTerminateEvent {}", + this, + event); + // commit directly + ((PipeTerminateEvent) event) + .decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true); + continue; + } + + if (event instanceof TabletInsertionEvent) { + if (onEvent((TabletInsertionEvent) event)) { + return; } + continue; + } + + if (event instanceof PipeTsFileInsertionEvent) { + if (onEvent((PipeTsFileInsertionEvent) event)) { + return; + } + continue; + } + + // TODO: + // - PipeHeartbeatEvent: ignored? (may affect pipe metrics) + // - UserDefinedEnrichedEvent: ignored? + // - Others: events related to meta sync, safe to ignore + LOGGER.info( + "Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent {} when prefetching.", + this, + event); + if (trySealBatch()) { + return; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java index 9a952d40073c3..7929d2a0ee1fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java @@ -19,10 +19,8 @@ package org.apache.iotdb.db.subscription.broker; -import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; -import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.pipe.api.event.Event; @@ -63,19 +61,6 @@ private synchronized Event filter(final Event event) { // make it synchronized return null; } - if (!(event instanceof EnrichedEvent)) { - LOGGER.info("Subscription: Ignore {}", event); - return null; - } - - if (event instanceof PipeTerminateEvent) { - LOGGER.info("Subscription: Detect PipeTerminateEvent {}, commit it directly", event); - // commit directly - ((PipeTerminateEvent) event) - .decreaseReferenceCount(TsFileDeduplicationBlockingPendingQueue.class.getName(), true); - return null; - } - if (event instanceof PipeTsFileInsertionEvent) { final PipeTsFileInsertionEvent pipeTsFileInsertionEvent = (PipeTsFileInsertionEvent) event; final int hashcode = pipeTsFileInsertionEvent.getTsFile().hashCode(); From fb0f89115f24861cb34b76f4fb841af52b1bef2f Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Mon, 15 Jul 2024 13:09:22 +0800 Subject: [PATCH 3/8] improve --- .../iotdb/rpc/subscription/config/ConsumerConstant.java | 6 +++--- .../session/subscription/consumer/SubscriptionConsumer.java | 2 +- .../manager/subscription/SubscriptionCoordinator.java | 6 +++++- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java index d449b2f9741a8..15156d66cc8cf 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java @@ -65,11 +65,11 @@ public class ConsumerConstant { public static final String CONSUME_LISTENER_KEY = "consume-listener"; public static final String AUTO_POLL_INTERVAL_MS_KEY = "auto-poll-interval-ms"; - public static final long AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE = 5_000L; - public static final long AUTO_POLL_INTERVAL_MS_MIN_VALUE = 500L; + public static final long AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE = 500L; + public static final long AUTO_POLL_INTERVAL_MS_MIN_VALUE = 100L; public static final String AUTO_POLL_TIMEOUT_MS_KEY = "auto-poll-timeout-ms"; - public static final long AUTO_POLL_TIMEOUT_MS_DEFAULT_VALUE = 10_000L; + public static final long AUTO_POLL_TIMEOUT_MS_DEFAULT_VALUE = 5_000L; public static final long AUTO_POLL_TIMEOUT_MS_MIN_VALUE = 1_000L; private ConsumerConstant() { diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java index 633f63ae7a996..ddf8abee931fb 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java @@ -82,7 +82,7 @@ abstract class SubscriptionConsumer implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionConsumer.class); - private static final long SLEEP_NS = 1_000_000_000L; + private static final long SLEEP_NS = 100_000_000L; // 100ms private final String username; private final String password; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java index 92383f3c3b0e1..7586b532724eb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java @@ -51,6 +51,8 @@ public class SubscriptionCoordinator { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionCoordinator.class); private final ConfigManager configManager; + + // NEVER EXPOSE THIS DIRECTLY TO THE OUTSIDE private final SubscriptionInfo subscriptionInfo; private final PipeTaskCoordinatorLock coordinatorLock; @@ -110,7 +112,9 @@ public void stopSubscriptionMetaSync() { subscriptionMetaSyncer.stop(); } - /** Caller should ensure that the method is called in the lock {@link #tryLock}. */ + /** + * Caller should ensure that the method is called in the write lock of {@link #subscriptionInfo}. + */ public void updateLastSyncedVersion() { subscriptionInfo.updateLastSyncedVersion(); } From 7675ff2cd2cda83b2710c3280e1b4fe3df255c9c Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Mon, 15 Jul 2024 17:06:03 +0800 Subject: [PATCH 4/8] improve --- .../subscription/config/ConsumerConstant.java | 5 ++--- .../consumer/SubscriptionPushConsumer.java | 6 ++---- .../broker/SubscriptionPrefetchingQueue.java | 16 +++++++++++----- .../SubscriptionPrefetchingTabletQueue.java | 7 +++++-- .../SubscriptionPrefetchingTsFileQueue.java | 18 ++++++++++-------- 5 files changed, 30 insertions(+), 22 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java index 15156d66cc8cf..3693e89c3bf5c 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java @@ -65,11 +65,10 @@ public class ConsumerConstant { public static final String CONSUME_LISTENER_KEY = "consume-listener"; public static final String AUTO_POLL_INTERVAL_MS_KEY = "auto-poll-interval-ms"; - public static final long AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE = 500L; - public static final long AUTO_POLL_INTERVAL_MS_MIN_VALUE = 100L; + public static final long AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE = 100L; public static final String AUTO_POLL_TIMEOUT_MS_KEY = "auto-poll-timeout-ms"; - public static final long AUTO_POLL_TIMEOUT_MS_DEFAULT_VALUE = 5_000L; + public static final long AUTO_POLL_TIMEOUT_MS_DEFAULT_VALUE = 10_000L; public static final long AUTO_POLL_TIMEOUT_MS_MIN_VALUE = 1_000L; private ConsumerConstant() { diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java index e28cf3b7c7ea9..39ebc5aeaf990 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java @@ -102,8 +102,7 @@ private SubscriptionPushConsumer( this.ackStrategy = ackStrategy; this.consumeListener = consumeListener; - this.autoPollIntervalMs = - Math.max(autoPollIntervalMs, ConsumerConstant.AUTO_POLL_INTERVAL_MS_MIN_VALUE); + this.autoPollIntervalMs = autoPollIntervalMs; this.autoPollTimeoutMs = Math.max(autoPollTimeoutMs, ConsumerConstant.AUTO_POLL_TIMEOUT_MS_MIN_VALUE); } @@ -290,8 +289,7 @@ public Builder consumeListener(final ConsumeListener consumeListener) { } public Builder autoPollIntervalMs(final long autoPollIntervalMs) { - this.autoPollIntervalMs = - Math.max(autoPollIntervalMs, ConsumerConstant.AUTO_POLL_INTERVAL_MS_MIN_VALUE); + this.autoPollIntervalMs = autoPollIntervalMs; return this; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index bf78c76e30dc2..333c44e6f743e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -140,15 +140,20 @@ public SubscriptionEvent poll(final String consumerId) { public abstract void executePrefetch(); /** - * prefetch at most one {@link SubscriptionEvent} from {@link + * Prefetch at most one {@link SubscriptionEvent} from {@link * SubscriptionPrefetchingQueue#inputPendingQueue} to {@link - * SubscriptionPrefetchingQueue#prefetchingQueue} + * SubscriptionPrefetchingQueue#prefetchingQueue}. + * + *

It will continuously attempt to prefetch and generate a {@link SubscriptionEvent} until + * {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty. + * + * @param trySealBatchIfEmpty {@code true} if {@link SubscriptionPrefetchingQueue#trySealBatch} is + * called when {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty, {@code false} + * otherwise */ protected void tryPrefetch(final boolean trySealBatchIfEmpty) { - Event event; - while (!inputPendingQueue.isEmpty()) { - event = UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll()); + final Event event = UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll()); if (Objects.isNull(event)) { // The event will be null in two cases: // 1. The inputPendingQueue is empty. @@ -202,6 +207,7 @@ protected void tryPrefetch(final boolean trySealBatchIfEmpty) { } } + // At this moment, the inputPendingQueue is empty. if (trySealBatchIfEmpty) { trySealBatch(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java index b0f69da2e764b..a3b1ebbcd39f7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java @@ -83,8 +83,11 @@ public void cleanup() { @Override public void executePrefetch() { - super.tryPrefetch(false); - this.serializeEventsInQueue(); + if (prefetchingQueue.isEmpty()) { + super.tryPrefetch(false); + } else { + this.serializeEventsInQueue(); + } } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java index 80b8da29fbc3a..c6372d4da5b53 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java @@ -261,14 +261,16 @@ public SubscriptionEvent poll(final String consumerId) { @Override public synchronized void executePrefetch() { - super.tryPrefetch(false); - - // prefetch remaining subscription events based on {@link consumerIdToSubscriptionEventMap} - for (final SubscriptionEvent event : consumerIdToSubscriptionEventMap.values()) { - try { - event.prefetchRemainingResponses(); - event.trySerializeRemainingResponses(); - } catch (final IOException ignored) { + if (prefetchingQueue.isEmpty() || consumerIdToSubscriptionEventMap.isEmpty()) { + super.tryPrefetch(false); + } else { + // prefetch remaining subscription events based on {@link consumerIdToSubscriptionEventMap} + for (final SubscriptionEvent event : consumerIdToSubscriptionEventMap.values()) { + try { + event.prefetchRemainingResponses(); + event.trySerializeRemainingResponses(); + } catch (final IOException ignored) { + } } } } From d75cf1eb2c83b970ca7cabfca2c17c6e0e8efd76 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Mon, 15 Jul 2024 18:05:07 +0800 Subject: [PATCH 5/8] optimize SubscriptionPrefetchingTsFileQueue lock contention --- .../broker/SubscriptionPrefetchingQueue.java | 1 + .../SubscriptionPrefetchingTabletQueue.java | 7 +- .../SubscriptionPrefetchingTsFileQueue.java | 155 +++++++++--------- .../subscription/event/SubscriptionEvent.java | 4 + .../SubscriptionPipeTabletEventBatch.java | 2 +- .../SubscriptionPipeTsFileEventBatch.java | 2 +- 6 files changed, 88 insertions(+), 83 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index 333c44e6f743e..f88368d01c8e4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -244,6 +244,7 @@ public boolean ack(final String consumerId, final SubscriptionCommitContext comm } if (event.isCommitted()) { + event.cleanup(); LOGGER.warn( "Subscription: subscription event {} is committed, subscription commit context {}, prefetching queue: {}", event, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java index a3b1ebbcd39f7..b0f69da2e764b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java @@ -83,11 +83,8 @@ public void cleanup() { @Override public void executePrefetch() { - if (prefetchingQueue.isEmpty()) { - super.tryPrefetch(false); - } else { - this.serializeEventsInQueue(); - } + super.tryPrefetch(false); + this.serializeEventsInQueue(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java index c6372d4da5b53..e5a2c1b69e17e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java @@ -95,16 +95,11 @@ public void cleanup() { @Override public SubscriptionEvent poll(final String consumerId) { + // check before polling event from prefetching queue if (hasUnPollableOnTheFlySubscriptionTsFileEvent(consumerId)) { return null; } - final SubscriptionEvent pollableEvent = getPollableOnTheFlySubscriptionTsFileEvent(consumerId); - if (Objects.nonNull(pollableEvent)) { - return pollableEvent; - } - - // At this point, the event that might be polled should not have been polled by any consumer. final SubscriptionEvent event = super.poll(consumerId); if (Objects.nonNull(event)) { consumerIdToSubscriptionEventMap.put(consumerId, event); @@ -113,10 +108,20 @@ public SubscriptionEvent poll(final String consumerId) { return event; } - public synchronized @NonNull SubscriptionEvent pollTsFile( + public @NonNull SubscriptionEvent pollTsFile( final String consumerId, final String fileName, final long writingOffset) { // 1. Extract current event and check it - final SubscriptionEvent event = consumerIdToSubscriptionEventMap.get(consumerId); + final SubscriptionEvent event = + consumerIdToSubscriptionEventMap.compute( + consumerId, + (id, ev) -> { + if (Objects.nonNull(ev) && ev.isCommitted()) { + ev.cleanup(); + return null; // remove this entry + } + return ev; + }); + if (Objects.isNull(event)) { final String errorMessage = String.format( @@ -126,17 +131,6 @@ public SubscriptionEvent poll(final String consumerId) { return generateSubscriptionPollErrorResponse(errorMessage); } - if (event.isCommitted()) { - event.cleanup(); - consumerIdToSubscriptionEventMap.remove(consumerId); - final String errorMessage = - String.format( - "SubscriptionEvent %s related to TsFile is committed, consumer: %s, writing offset: %s, prefetching queue: %s", - event, consumerId, writingOffset, this); - LOGGER.warn(errorMessage); - return generateSubscriptionPollErrorResponse(errorMessage); - } - // check consumer id if (!Objects.equals(event.getLastPolledConsumerId(), consumerId)) { final String errorMessage = @@ -260,17 +254,33 @@ public SubscriptionEvent poll(final String consumerId) { /////////////////////////////// prefetch /////////////////////////////// @Override - public synchronized void executePrefetch() { - if (prefetchingQueue.isEmpty() || consumerIdToSubscriptionEventMap.isEmpty()) { - super.tryPrefetch(false); - } else { - // prefetch remaining subscription events based on {@link consumerIdToSubscriptionEventMap} - for (final SubscriptionEvent event : consumerIdToSubscriptionEventMap.values()) { - try { - event.prefetchRemainingResponses(); - event.trySerializeRemainingResponses(); - } catch (final IOException ignored) { - } + public void executePrefetch() { + super.tryPrefetch(false); + + for (final Map.Entry entry : + consumerIdToSubscriptionEventMap.entrySet()) { + final String consumerId = entry.getKey(); + final SubscriptionEvent event = entry.getValue(); + + // clean up committed event + if (event.isCommitted()) { + event.cleanup(); + consumerIdToSubscriptionEventMap.remove(consumerId); + continue; + } + + // nack pollable event + if (event.pollable()) { + event.nack(); + consumerIdToSubscriptionEventMap.remove(consumerId); + continue; + } + + // prefetch and serialize remaining subscription events + try { + event.prefetchRemainingResponses(); + event.trySerializeRemainingResponses(); + } catch (final IOException ignored) { } } } @@ -357,16 +367,39 @@ private void sealBatch(final SubscriptionPipeTsFileEventBatch batch) throws Exce /////////////////////////////// commit /////////////////////////////// + /** + * @return {@code true} if ack successfully + */ + @Override + public boolean ack(final String consumerId, final SubscriptionCommitContext commitContext) { + if (super.ack(consumerId, commitContext)) { + consumerIdToSubscriptionEventMap.compute( + consumerId, + (id, ev) -> { + if (Objects.nonNull(ev) && Objects.equals(commitContext, ev.getCommitContext())) { + return null; // remove this entry + } + return ev; + }); + return true; + } + return false; + } + /** * @return {@code true} if nack successfully */ @Override public boolean nack(final String consumerId, final SubscriptionCommitContext commitContext) { if (super.nack(consumerId, commitContext)) { - final SubscriptionEvent event = consumerIdToSubscriptionEventMap.get(consumerId); - if (Objects.nonNull(event) && Objects.equals(commitContext, event.getCommitContext())) { - consumerIdToSubscriptionEventMap.remove(consumerId); - } + consumerIdToSubscriptionEventMap.compute( + consumerId, + (id, ev) -> { + if (Objects.nonNull(ev) && Objects.equals(commitContext, ev.getCommitContext())) { + return null; // remove this entry + } + return ev; + }); return true; } return false; @@ -374,20 +407,20 @@ public boolean nack(final String consumerId, final SubscriptionCommitContext com /////////////////////////////// utility /////////////////////////////// - private synchronized boolean hasUnPollableOnTheFlySubscriptionTsFileEvent( - final String consumerId) { - final SubscriptionEvent event = consumerIdToSubscriptionEventMap.get(consumerId); - if (Objects.isNull(event)) { - return false; - } + private boolean hasUnPollableOnTheFlySubscriptionTsFileEvent(final String consumerId) { + final SubscriptionEvent event = + consumerIdToSubscriptionEventMap.compute( + consumerId, + (id, ev) -> { + if (Objects.nonNull(ev) && ev.isCommitted()) { + ev.cleanup(); + return null; // remove this entry + } - if (event.isCommitted()) { - event.cleanup(); - consumerIdToSubscriptionEventMap.remove(consumerId); - return false; - } + return ev; + }); - if (!event.pollable()) { + if (Objects.nonNull(event) && !event.pollable()) { LOGGER.info( "SubscriptionPrefetchingTsFileQueue {} is currently transferring TsFile (with event {}) to consumer {}", this, @@ -399,36 +432,6 @@ private synchronized boolean hasUnPollableOnTheFlySubscriptionTsFileEvent( return false; } - private synchronized SubscriptionEvent getPollableOnTheFlySubscriptionTsFileEvent( - final String consumerId) { - for (final Map.Entry entry : - consumerIdToSubscriptionEventMap.entrySet()) { - final SubscriptionEvent event = entry.getValue(); - if (event.isCommitted()) { - event.cleanup(); - consumerIdToSubscriptionEventMap.remove(entry.getKey()); - continue; - } - - if (!event.pollable()) { - continue; - } - - // uncommitted and pollable event - - consumerIdToSubscriptionEventMap.remove(entry.getKey()); - - event.nack(); - consumerIdToSubscriptionEventMap.put(consumerId, event); - - event.recordLastPolledConsumerId(consumerId); - event.recordLastPolledTimestamp(); - return event; - } - - return null; - } - private SubscriptionEvent generateSubscriptionPollErrorResponse(final String errorMessage) { // consider non-critical by default, meaning the client can retry return super.generateSubscriptionPollErrorResponse(errorMessage, false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java index b8926a80316bc..9a1df3cb4918f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java @@ -120,6 +120,7 @@ public void recordCommittedTimestamp() { committedTimestamp = System.currentTimeMillis(); } + /** NOTE: {@link SubscriptionEvent#cleanup} should be called immediately if event is committed */ public boolean isCommitted() { if (commitContext.getCommitId() == INVALID_COMMIT_ID) { // event with invalid commit id is committed @@ -162,6 +163,9 @@ public boolean pollable() { if (lastPolledTimestamp == INVALID_TIMESTAMP) { return true; } + if (Objects.nonNull(lastPolledConsumerId)) { + return false; + } // Recycle events that may not be able to be committed, i.e., those that have been polled but // not committed within a certain period of time. return System.currentTimeMillis() - lastPolledTimestamp diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java index e9ea7f423e048..892bcaabb68c2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java @@ -103,7 +103,7 @@ public synchronized boolean onEvent(final EnrichedEvent event) { return shouldEmit(); } - public void ack() { + public synchronized void ack() { for (final EnrichedEvent enrichedEvent : enrichedEvents) { enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java index 1e0f91ae787b0..93f6a9464b40a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java @@ -46,7 +46,7 @@ public synchronized boolean onEvent(final TabletInsertionEvent event) throws Exc return batch.onEvent(event); } - public void ack() { + public synchronized void ack() { batch.decreaseEventsReferenceCount(this.getClass().getName(), true); } From 8e6107d40dcdf69e82cbf6b241fb646428967469 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Mon, 15 Jul 2024 18:51:07 +0800 Subject: [PATCH 6/8] improve --- .../SubscriptionPrefetchingTsFileQueue.java | 58 +++++++++++-------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java index e5a2c1b69e17e..84fdb3661e7de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java @@ -33,6 +33,7 @@ import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; +import com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -257,31 +259,37 @@ public SubscriptionEvent poll(final String consumerId) { public void executePrefetch() { super.tryPrefetch(false); - for (final Map.Entry entry : - consumerIdToSubscriptionEventMap.entrySet()) { - final String consumerId = entry.getKey(); - final SubscriptionEvent event = entry.getValue(); - - // clean up committed event - if (event.isCommitted()) { - event.cleanup(); - consumerIdToSubscriptionEventMap.remove(consumerId); - continue; - } - - // nack pollable event - if (event.pollable()) { - event.nack(); - consumerIdToSubscriptionEventMap.remove(consumerId); - continue; - } - - // prefetch and serialize remaining subscription events - try { - event.prefetchRemainingResponses(); - event.trySerializeRemainingResponses(); - } catch (final IOException ignored) { - } + // iterate on the snapshot of the key set + final Set consumerIds = ImmutableSet.copyOf(consumerIdToSubscriptionEventMap.keySet()); + for (final String consumerId : consumerIds) { + consumerIdToSubscriptionEventMap.compute( + consumerId, + (id, ev) -> { + if (Objects.isNull(ev)) { + return null; + } + + // clean up committed event + if (ev.isCommitted()) { + ev.cleanup(); + return null; // remove this entry + } + + // nack pollable event + if (ev.pollable()) { + ev.nack(); + return null; // remove this entry + } + + // prefetch and serialize remaining subscription events + try { + ev.prefetchRemainingResponses(); + ev.trySerializeRemainingResponses(); + } catch (final IOException ignored) { + } + + return ev; + }); } } From a741ec9f9bbaa39c5fd2cc1e452bf4d649f0324f Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Mon, 15 Jul 2024 18:54:32 +0800 Subject: [PATCH 7/8] avoid autoPollIntervalMs set to zero --- .../subscription/consumer/SubscriptionPushConsumer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java index 39ebc5aeaf990..5143958b89a9c 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java @@ -102,7 +102,7 @@ private SubscriptionPushConsumer( this.ackStrategy = ackStrategy; this.consumeListener = consumeListener; - this.autoPollIntervalMs = autoPollIntervalMs; + this.autoPollIntervalMs = Math.max(autoPollIntervalMs, 1); this.autoPollTimeoutMs = Math.max(autoPollTimeoutMs, ConsumerConstant.AUTO_POLL_TIMEOUT_MS_MIN_VALUE); } @@ -289,7 +289,7 @@ public Builder consumeListener(final ConsumeListener consumeListener) { } public Builder autoPollIntervalMs(final long autoPollIntervalMs) { - this.autoPollIntervalMs = autoPollIntervalMs; + this.autoPollIntervalMs = Math.max(autoPollIntervalMs, 1); return this; } From e7f8b2becd169a9fa6de97b2f1e494eec45053a9 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Tue, 16 Jul 2024 11:19:37 +0800 Subject: [PATCH 8/8] improve --- .../broker/SubscriptionPrefetchingTsFileQueue.java | 5 +++++ .../iotdb/db/subscription/event/SubscriptionEvent.java | 6 +++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java index 84fdb3661e7de..26fb19846b10e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java @@ -261,6 +261,9 @@ public void executePrefetch() { // iterate on the snapshot of the key set final Set consumerIds = ImmutableSet.copyOf(consumerIdToSubscriptionEventMap.keySet()); + // NOTE: + // 1. Ignore entries added during iteration. + // 2. For entries deleted by other threads during iteration, just check if the value is null. for (final String consumerId : consumerIds) { consumerIdToSubscriptionEventMap.compute( consumerId, @@ -282,6 +285,8 @@ public void executePrefetch() { } // prefetch and serialize remaining subscription events + // NOTE: Since the compute call for the same key is atomic and will be executed + // serially, the current prefetch and serialize operations are safe. try { ev.prefetchRemainingResponses(); ev.trySerializeRemainingResponses(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java index 9a1df3cb4918f..36e9804dda4ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java @@ -106,7 +106,7 @@ public SubscriptionPollResponse getCurrentResponse() { return getResponse(currentResponseIndex); } - public SubscriptionPollResponse getResponse(final int index) { + private SubscriptionPollResponse getResponse(final int index) { return responses[index]; } @@ -269,7 +269,7 @@ public boolean trySerializeCurrentResponse() { * @param index the index of response to be serialized * @return {@code true} if a serialization operation was actually performed */ - public boolean trySerializeResponse(final int index) { + private boolean trySerializeResponse(final int index) { if (index >= responses.length) { return false; } @@ -315,7 +315,7 @@ public void resetResponseByteBuffer(final boolean resetAll) { /////////////////////////////// tsfile /////////////////////////////// - public @NonNull SubscriptionPollResponse generateSubscriptionPollResponseWithPieceOrSealPayload( + private @NonNull SubscriptionPollResponse generateSubscriptionPollResponseWithPieceOrSealPayload( final long writingOffset) throws IOException { final File tsFile = pipeEvents.getTsFile();