diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java index aa68a522b22cf..5a9d886893102 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java @@ -58,6 +58,7 @@ public class SubscriptionEvent { private final SubscriptionCommitContext commitContext; // all responses have the same commit context + // lastPolledConsumerId is not used as a criterion for determining pollability private String lastPolledConsumerId; private long lastPolledTimestamp; private long committedTimestamp; @@ -163,20 +164,18 @@ public boolean pollable() { if (lastPolledTimestamp == INVALID_TIMESTAMP) { return true; } - if (Objects.nonNull(lastPolledConsumerId)) { - return false; + if (canRecycle()) { + nack(); + return true; } - // Recycle events that may not be able to be committed, i.e., those that have been polled but - // not committed within a certain period of time. - return System.currentTimeMillis() - lastPolledTimestamp - > SubscriptionConfig.getInstance().getSubscriptionRecycleUncommittedEventIntervalMs(); + return false; } public void nack() { // reset current response index currentResponseIndex = 0; - lastPolledConsumerId = null; + // reset lastPolledTimestamp makes this event pollable lastPolledTimestamp = INVALID_TIMESTAMP; } @@ -188,6 +187,13 @@ public String getLastPolledConsumerId() { return lastPolledConsumerId; } + private boolean canRecycle() { + // Recycle events that may not be able to be committed, i.e., those that have been polled but + // not committed within a certain period of time. + return System.currentTimeMillis() - lastPolledTimestamp + > SubscriptionConfig.getInstance().getSubscriptionRecycleUncommittedEventIntervalMs(); + } + //////////////////////////// prefetch & fetch //////////////////////////// /**