Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class SubscriptionEvent {
private final SubscriptionCommitContext
commitContext; // all responses have the same commit context

// lastPolledConsumerId is not used as a criterion for determining pollability
private String lastPolledConsumerId;
private long lastPolledTimestamp;
private long committedTimestamp;
Expand Down Expand Up @@ -163,20 +164,18 @@ public boolean pollable() {
if (lastPolledTimestamp == INVALID_TIMESTAMP) {
return true;
}
if (Objects.nonNull(lastPolledConsumerId)) {
return false;
if (canRecycle()) {
nack();
return true;
}
// Recycle events that may not be able to be committed, i.e., those that have been polled but
// not committed within a certain period of time.
return System.currentTimeMillis() - lastPolledTimestamp
> SubscriptionConfig.getInstance().getSubscriptionRecycleUncommittedEventIntervalMs();
return false;
}

public void nack() {
// reset current response index
currentResponseIndex = 0;

lastPolledConsumerId = null;
// reset lastPolledTimestamp makes this event pollable
lastPolledTimestamp = INVALID_TIMESTAMP;
}

Expand All @@ -188,6 +187,13 @@ public String getLastPolledConsumerId() {
return lastPolledConsumerId;
}

private boolean canRecycle() {
// Recycle events that may not be able to be committed, i.e., those that have been polled but
// not committed within a certain period of time.
return System.currentTimeMillis() - lastPolledTimestamp
> SubscriptionConfig.getInstance().getSubscriptionRecycleUncommittedEventIntervalMs();
}

//////////////////////////// prefetch & fetch ////////////////////////////

/**
Expand Down