diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java index d449b2f9741a8..3693e89c3bf5c 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java @@ -65,8 +65,7 @@ public class ConsumerConstant { public static final String CONSUME_LISTENER_KEY = "consume-listener"; public static final String AUTO_POLL_INTERVAL_MS_KEY = "auto-poll-interval-ms"; - public static final long AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE = 5_000L; - public static final long AUTO_POLL_INTERVAL_MS_MIN_VALUE = 500L; + public static final long AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE = 100L; public static final String AUTO_POLL_TIMEOUT_MS_KEY = "auto-poll-timeout-ms"; public static final long AUTO_POLL_TIMEOUT_MS_DEFAULT_VALUE = 10_000L; diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java index 633f63ae7a996..ddf8abee931fb 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java @@ -82,7 +82,7 @@ abstract class SubscriptionConsumer implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionConsumer.class); - private static final long SLEEP_NS = 1_000_000_000L; + private static final long SLEEP_NS = 100_000_000L; // 100ms private final String username; private final String password; diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java index e28cf3b7c7ea9..5143958b89a9c 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java @@ -102,8 +102,7 @@ private SubscriptionPushConsumer( this.ackStrategy = ackStrategy; this.consumeListener = consumeListener; - this.autoPollIntervalMs = - Math.max(autoPollIntervalMs, ConsumerConstant.AUTO_POLL_INTERVAL_MS_MIN_VALUE); + this.autoPollIntervalMs = Math.max(autoPollIntervalMs, 1); this.autoPollTimeoutMs = Math.max(autoPollTimeoutMs, ConsumerConstant.AUTO_POLL_TIMEOUT_MS_MIN_VALUE); } @@ -290,8 +289,7 @@ public Builder consumeListener(final ConsumeListener consumeListener) { } public Builder autoPollIntervalMs(final long autoPollIntervalMs) { - this.autoPollIntervalMs = - Math.max(autoPollIntervalMs, ConsumerConstant.AUTO_POLL_INTERVAL_MS_MIN_VALUE); + this.autoPollIntervalMs = Math.max(autoPollIntervalMs, 1); return this; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java index 2d72f3c6c70db..c48f3e0f07813 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java @@ -51,6 +51,8 @@ public class SubscriptionCoordinator { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionCoordinator.class); private final ConfigManager configManager; + + // NEVER EXPOSE THIS DIRECTLY TO THE OUTSIDE private final SubscriptionInfo subscriptionInfo; private final PipeTaskCoordinatorLock coordinatorLock; @@ -122,7 +124,9 @@ public void stopSubscriptionMetaSync() { subscriptionMetaSyncer.stop(); } - /** Caller should ensure that the method is called in the lock {@link #tryLock}. */ + /** + * Caller should ensure that the method is called in the write lock of {@link #subscriptionInfo}. + */ public void updateLastSyncedVersion() { subscriptionInfo.updateLastSyncedVersion(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index 2d23950fe752f..f88368d01c8e4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -93,7 +93,7 @@ public void cleanup() { public SubscriptionEvent poll(final String consumerId) { if (prefetchingQueue.isEmpty()) { - tryPrefetch(); + tryPrefetch(true); } final long size = prefetchingQueue.size(); @@ -140,14 +140,27 @@ public SubscriptionEvent poll(final String consumerId) { public abstract void executePrefetch(); /** - * prefetch at most one {@link SubscriptionEvent} from {@link + * Prefetch at most one {@link SubscriptionEvent} from {@link * SubscriptionPrefetchingQueue#inputPendingQueue} to {@link - * SubscriptionPrefetchingQueue#prefetchingQueue} + * SubscriptionPrefetchingQueue#prefetchingQueue}. + * + *

It will continuously attempt to prefetch and generate a {@link SubscriptionEvent} until + * {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty. + * + * @param trySealBatchIfEmpty {@code true} if {@link SubscriptionPrefetchingQueue#trySealBatch} is + * called when {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty, {@code false} + * otherwise */ - protected void tryPrefetch() { - Event event; - while (Objects.nonNull( - event = UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll()))) { + protected void tryPrefetch(final boolean trySealBatchIfEmpty) { + while (!inputPendingQueue.isEmpty()) { + final Event event = UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll()); + if (Objects.isNull(event)) { + // The event will be null in two cases: + // 1. The inputPendingQueue is empty. + // 2. The tsfile event has been deduplicated. + continue; + } + if (!(event instanceof EnrichedEvent)) { LOGGER.warn( "Subscription: SubscriptionPrefetchingQueue {} only support prefetch EnrichedEvent. Ignore {}.", @@ -169,26 +182,35 @@ protected void tryPrefetch() { if (event instanceof TabletInsertionEvent) { if (onEvent((TabletInsertionEvent) event)) { - break; + return; } - } else if (event instanceof PipeTsFileInsertionEvent) { + continue; + } + + if (event instanceof PipeTsFileInsertionEvent) { if (onEvent((PipeTsFileInsertionEvent) event)) { - break; - } - } else { - // TODO: - // - PipeHeartbeatEvent: ignored? (may affect pipe metrics) - // - UserDefinedEnrichedEvent: ignored? - // - Others: events related to meta sync, safe to ignore - LOGGER.info( - "Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent {} when prefetching.", - this, - event); - if (trySealBatch()) { - break; + return; } + continue; + } + + // TODO: + // - PipeHeartbeatEvent: ignored? (may affect pipe metrics) + // - UserDefinedEnrichedEvent: ignored? + // - Others: events related to meta sync, safe to ignore + LOGGER.info( + "Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent {} when prefetching.", + this, + event); + if (trySealBatch()) { + return; } } + + // At this moment, the inputPendingQueue is empty. + if (trySealBatchIfEmpty) { + trySealBatch(); + } } /** @@ -222,6 +244,7 @@ public boolean ack(final String consumerId, final SubscriptionCommitContext comm } if (event.isCommitted()) { + event.cleanup(); LOGGER.warn( "Subscription: subscription event {} is committed, subscription commit context {}, prefetching queue: {}", event, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java index 3aa36425b88f2..b0f69da2e764b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java @@ -83,7 +83,7 @@ public void cleanup() { @Override public void executePrefetch() { - super.tryPrefetch(); + super.tryPrefetch(false); this.serializeEventsInQueue(); } @@ -114,13 +114,18 @@ private boolean onEventInternal(final EnrichedEvent event) { @Override protected boolean trySealBatch() { + final AtomicBoolean result = new AtomicBoolean(false); currentBatchRef.getAndUpdate( (batch) -> { - sealBatch(batch); - return new SubscriptionPipeTabletEventBatch( - BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES); + if (batch.shouldEmit()) { + sealBatch(batch); + result.set(true); + return new SubscriptionPipeTabletEventBatch( + BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES); + } + return batch; }); - return true; + return result.get(); } private void sealBatch(final SubscriptionPipeTabletEventBatch batch) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java index b0aed50b8bf45..26fb19846b10e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java @@ -33,6 +33,7 @@ import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; +import com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -95,16 +97,11 @@ public void cleanup() { @Override public SubscriptionEvent poll(final String consumerId) { + // check before polling event from prefetching queue if (hasUnPollableOnTheFlySubscriptionTsFileEvent(consumerId)) { return null; } - final SubscriptionEvent pollableEvent = getPollableOnTheFlySubscriptionTsFileEvent(consumerId); - if (Objects.nonNull(pollableEvent)) { - return pollableEvent; - } - - // At this point, the event that might be polled should not have been polled by any consumer. final SubscriptionEvent event = super.poll(consumerId); if (Objects.nonNull(event)) { consumerIdToSubscriptionEventMap.put(consumerId, event); @@ -113,10 +110,20 @@ public SubscriptionEvent poll(final String consumerId) { return event; } - public synchronized @NonNull SubscriptionEvent pollTsFile( + public @NonNull SubscriptionEvent pollTsFile( final String consumerId, final String fileName, final long writingOffset) { // 1. Extract current event and check it - final SubscriptionEvent event = consumerIdToSubscriptionEventMap.get(consumerId); + final SubscriptionEvent event = + consumerIdToSubscriptionEventMap.compute( + consumerId, + (id, ev) -> { + if (Objects.nonNull(ev) && ev.isCommitted()) { + ev.cleanup(); + return null; // remove this entry + } + return ev; + }); + if (Objects.isNull(event)) { final String errorMessage = String.format( @@ -126,17 +133,6 @@ public SubscriptionEvent poll(final String consumerId) { return generateSubscriptionPollErrorResponse(errorMessage); } - if (event.isCommitted()) { - event.cleanup(); - consumerIdToSubscriptionEventMap.remove(consumerId); - final String errorMessage = - String.format( - "SubscriptionEvent %s related to TsFile is committed, consumer: %s, writing offset: %s, prefetching queue: %s", - event, consumerId, writingOffset, this); - LOGGER.warn(errorMessage); - return generateSubscriptionPollErrorResponse(errorMessage); - } - // check consumer id if (!Objects.equals(event.getLastPolledConsumerId(), consumerId)) { final String errorMessage = @@ -260,16 +256,45 @@ public SubscriptionEvent poll(final String consumerId) { /////////////////////////////// prefetch /////////////////////////////// @Override - public synchronized void executePrefetch() { - super.tryPrefetch(); - - // prefetch remaining subscription events based on {@link consumerIdToSubscriptionEventMap} - for (final SubscriptionEvent event : consumerIdToSubscriptionEventMap.values()) { - try { - event.prefetchRemainingResponses(); - event.trySerializeRemainingResponses(); - } catch (final IOException ignored) { - } + public void executePrefetch() { + super.tryPrefetch(false); + + // iterate on the snapshot of the key set + final Set consumerIds = ImmutableSet.copyOf(consumerIdToSubscriptionEventMap.keySet()); + // NOTE: + // 1. Ignore entries added during iteration. + // 2. For entries deleted by other threads during iteration, just check if the value is null. + for (final String consumerId : consumerIds) { + consumerIdToSubscriptionEventMap.compute( + consumerId, + (id, ev) -> { + if (Objects.isNull(ev)) { + return null; + } + + // clean up committed event + if (ev.isCommitted()) { + ev.cleanup(); + return null; // remove this entry + } + + // nack pollable event + if (ev.pollable()) { + ev.nack(); + return null; // remove this entry + } + + // prefetch and serialize remaining subscription events + // NOTE: Since the compute call for the same key is atomic and will be executed + // serially, the current prefetch and serialize operations are safe. + try { + ev.prefetchRemainingResponses(); + ev.trySerializeRemainingResponses(); + } catch (final IOException ignored) { + } + + return ev; + }); } } @@ -355,16 +380,39 @@ private void sealBatch(final SubscriptionPipeTsFileEventBatch batch) throws Exce /////////////////////////////// commit /////////////////////////////// + /** + * @return {@code true} if ack successfully + */ + @Override + public boolean ack(final String consumerId, final SubscriptionCommitContext commitContext) { + if (super.ack(consumerId, commitContext)) { + consumerIdToSubscriptionEventMap.compute( + consumerId, + (id, ev) -> { + if (Objects.nonNull(ev) && Objects.equals(commitContext, ev.getCommitContext())) { + return null; // remove this entry + } + return ev; + }); + return true; + } + return false; + } + /** * @return {@code true} if nack successfully */ @Override public boolean nack(final String consumerId, final SubscriptionCommitContext commitContext) { if (super.nack(consumerId, commitContext)) { - final SubscriptionEvent event = consumerIdToSubscriptionEventMap.get(consumerId); - if (Objects.nonNull(event) && Objects.equals(commitContext, event.getCommitContext())) { - consumerIdToSubscriptionEventMap.remove(consumerId); - } + consumerIdToSubscriptionEventMap.compute( + consumerId, + (id, ev) -> { + if (Objects.nonNull(ev) && Objects.equals(commitContext, ev.getCommitContext())) { + return null; // remove this entry + } + return ev; + }); return true; } return false; @@ -372,20 +420,20 @@ public boolean nack(final String consumerId, final SubscriptionCommitContext com /////////////////////////////// utility /////////////////////////////// - private synchronized boolean hasUnPollableOnTheFlySubscriptionTsFileEvent( - final String consumerId) { - final SubscriptionEvent event = consumerIdToSubscriptionEventMap.get(consumerId); - if (Objects.isNull(event)) { - return false; - } + private boolean hasUnPollableOnTheFlySubscriptionTsFileEvent(final String consumerId) { + final SubscriptionEvent event = + consumerIdToSubscriptionEventMap.compute( + consumerId, + (id, ev) -> { + if (Objects.nonNull(ev) && ev.isCommitted()) { + ev.cleanup(); + return null; // remove this entry + } - if (event.isCommitted()) { - event.cleanup(); - consumerIdToSubscriptionEventMap.remove(consumerId); - return false; - } + return ev; + }); - if (!event.pollable()) { + if (Objects.nonNull(event) && !event.pollable()) { LOGGER.info( "SubscriptionPrefetchingTsFileQueue {} is currently transferring TsFile (with event {}) to consumer {}", this, @@ -397,36 +445,6 @@ private synchronized boolean hasUnPollableOnTheFlySubscriptionTsFileEvent( return false; } - private synchronized SubscriptionEvent getPollableOnTheFlySubscriptionTsFileEvent( - final String consumerId) { - for (final Map.Entry entry : - consumerIdToSubscriptionEventMap.entrySet()) { - final SubscriptionEvent event = entry.getValue(); - if (event.isCommitted()) { - event.cleanup(); - consumerIdToSubscriptionEventMap.remove(entry.getKey()); - continue; - } - - if (!event.pollable()) { - continue; - } - - // uncommitted and pollable event - - consumerIdToSubscriptionEventMap.remove(entry.getKey()); - - event.nack(); - consumerIdToSubscriptionEventMap.put(consumerId, event); - - event.recordLastPolledConsumerId(consumerId); - event.recordLastPolledTimestamp(); - return event; - } - - return null; - } - private SubscriptionEvent generateSubscriptionPollErrorResponse(final String errorMessage) { // consider non-critical by default, meaning the client can retry return super.generateSubscriptionPollErrorResponse(errorMessage, false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java index b8926a80316bc..36e9804dda4ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java @@ -106,7 +106,7 @@ public SubscriptionPollResponse getCurrentResponse() { return getResponse(currentResponseIndex); } - public SubscriptionPollResponse getResponse(final int index) { + private SubscriptionPollResponse getResponse(final int index) { return responses[index]; } @@ -120,6 +120,7 @@ public void recordCommittedTimestamp() { committedTimestamp = System.currentTimeMillis(); } + /** NOTE: {@link SubscriptionEvent#cleanup} should be called immediately if event is committed */ public boolean isCommitted() { if (commitContext.getCommitId() == INVALID_COMMIT_ID) { // event with invalid commit id is committed @@ -162,6 +163,9 @@ public boolean pollable() { if (lastPolledTimestamp == INVALID_TIMESTAMP) { return true; } + if (Objects.nonNull(lastPolledConsumerId)) { + return false; + } // Recycle events that may not be able to be committed, i.e., those that have been polled but // not committed within a certain period of time. return System.currentTimeMillis() - lastPolledTimestamp @@ -265,7 +269,7 @@ public boolean trySerializeCurrentResponse() { * @param index the index of response to be serialized * @return {@code true} if a serialization operation was actually performed */ - public boolean trySerializeResponse(final int index) { + private boolean trySerializeResponse(final int index) { if (index >= responses.length) { return false; } @@ -311,7 +315,7 @@ public void resetResponseByteBuffer(final boolean resetAll) { /////////////////////////////// tsfile /////////////////////////////// - public @NonNull SubscriptionPollResponse generateSubscriptionPollResponseWithPieceOrSealPayload( + private @NonNull SubscriptionPollResponse generateSubscriptionPollResponseWithPieceOrSealPayload( final long writingOffset) throws IOException { final File tsFile = pipeEvents.getTsFile(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java index e9ea7f423e048..892bcaabb68c2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java @@ -103,7 +103,7 @@ public synchronized boolean onEvent(final EnrichedEvent event) { return shouldEmit(); } - public void ack() { + public synchronized void ack() { for (final EnrichedEvent enrichedEvent : enrichedEvents) { enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java index 1e0f91ae787b0..93f6a9464b40a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java @@ -46,7 +46,7 @@ public synchronized boolean onEvent(final TabletInsertionEvent event) throws Exc return batch.onEvent(event); } - public void ack() { + public synchronized void ack() { batch.decreaseEventsReferenceCount(this.getClass().getName(), true); }