Skip to content

Commit

Permalink
[fix][broker] Fix topic status for oldestBacklogMessageAgeSeconds con…
Browse files Browse the repository at this point in the history
…tinuously increases even when there is no backlog. (#22907)
  • Loading branch information
shibd committed Jun 14, 2024
1 parent 7a21918 commit 6831231
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1467,7 +1467,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
return FutureUtil.failedFuture(
new TopicBusyException("Topic has subscriptions: " + subscriptions.keys()));
} else if (failIfHasBacklogs) {
if (hasBacklogs()) {
if (hasBacklogs(false)) {
List<String> backlogSubs =
subscriptions.values().stream()
.filter(sub -> sub.getNumberOfEntriesInBacklog(false) > 0)
Expand Down Expand Up @@ -2638,12 +2638,9 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
stats.backlogQuotaLimitTime = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();

TimeBasedBacklogQuotaCheckResult backlogQuotaCheckResult = timeBasedBacklogQuotaCheckResult;
stats.oldestBacklogMessageAgeSeconds = (backlogQuotaCheckResult == null)
? (long) -1
: TimeUnit.MILLISECONDS.toSeconds(
Clock.systemUTC().millis() - backlogQuotaCheckResult.getPositionPublishTimestampInMillis());

stats.oldestBacklogMessageAgeSeconds = getBestEffortOldestUnacknowledgedMessageAgeSeconds();
stats.oldestBacklogMessageSubscriptionName = (backlogQuotaCheckResult == null)
|| !hasBacklogs(getStatsOptions.isGetPreciseBacklog())
? null
: backlogQuotaCheckResult.getCursorName();

Expand Down Expand Up @@ -2906,7 +2903,7 @@ public boolean isActive(InactiveTopicDeleteMode deleteMode) {
}
break;
case delete_when_subscriptions_caught_up:
if (hasBacklogs()) {
if (hasBacklogs(false)) {
return true;
}
break;
Expand All @@ -2919,8 +2916,8 @@ public boolean isActive(InactiveTopicDeleteMode deleteMode) {
}
}

private boolean hasBacklogs() {
return subscriptions.values().stream().anyMatch(sub -> sub.getNumberOfEntriesInBacklog(false) > 0);
private boolean hasBacklogs(boolean getPreciseBacklog) {
return subscriptions.values().stream().anyMatch(sub -> sub.getNumberOfEntriesInBacklog(getPreciseBacklog) > 0);
}

@Override
Expand Down Expand Up @@ -3466,6 +3463,9 @@ public boolean isSizeBacklogExceeded() {

@Override
public long getBestEffortOldestUnacknowledgedMessageAgeSeconds() {
if (!hasBacklogs(false)) {
return 0;
}
TimeBasedBacklogQuotaCheckResult result = timeBasedBacklogQuotaCheckResult;
if (result == null) {
return -1;
Expand Down Expand Up @@ -3553,6 +3553,9 @@ public CompletableFuture<Boolean> checkTimeBacklogExceeded() {
}

if (brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck()) {
if (!hasBacklogs(true)) {
return CompletableFuture.completedFuture(false);
}
CompletableFuture<Boolean> future = new CompletableFuture<>();
// Check if first unconsumed message(first message after mark delete position)
// for slowest cursor's has expired.
Expand Down Expand Up @@ -3606,6 +3609,9 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
return future;
} else {
try {
if (!hasBacklogs(false)) {
return CompletableFuture.completedFuture(false);
}
EstimateTimeBasedBacklogQuotaCheckResult checkResult =
estimatedTimeBasedBacklogQuotaCheck(oldestMarkDeletePosition);
if (checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.assertj.core.api.AssertionsForClassTypes.within;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -296,8 +297,12 @@ public void testBacklogQuotaWithReader() throws Exception {
}

private TopicStats getTopicStats(String topic1) throws PulsarAdminException {
return getTopicStats(topic1, true);
}

private TopicStats getTopicStats(String topic1, boolean getPreciseBacklog) throws PulsarAdminException {
TopicStats stats =
admin.topics().getStats(topic1, GetStatsOptions.builder().getPreciseBacklog(true).build());
admin.topics().getStats(topic1, GetStatsOptions.builder().getPreciseBacklog(getPreciseBacklog).build());
return stats;
}

Expand Down Expand Up @@ -502,9 +507,117 @@ public void backlogsStatsPrecise() throws PulsarAdminException, PulsarClientExce

// Cache should be used, since position hasn't changed
assertThat(getReadEntries(topic1)).isEqualTo(readEntries);

// Move subscription 1 and 2 to end
Message<byte[]> msg = consumer1.receive();
consumer1.acknowledge(msg);
consumer2.acknowledge(secondOldestMessage);
for (int i = 0; i < 2; i++) {
Message<byte[]> message = consumer2.receive();
log.info("Subscription 2 about to ack message ID {}", message.getMessageId());
consumer2.acknowledge(message);
}

log.info("Subscription 1 and 2 moved to end. Now should not backlog");
waitForMarkDeletePositionToChange(topic1, subName1, c1MarkDeletePositionBefore);
waitForQuotaCheckToRunTwice();

topicStats = getTopicStats(topic1);
assertThat(topicStats.getBacklogSize()).isEqualTo(0);
assertThat(topicStats.getSubscriptions().get(subName1).getMsgBacklog()).isEqualTo(0);
assertThat(topicStats.getSubscriptions().get(subName2).getMsgBacklog()).isEqualTo(0);
assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isEqualTo(0);
assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isNull();

metrics = prometheusMetricsClient.getMetrics();
backlogAgeMetric =
metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds",
Pair.of("topic", topic1));
assertThat(backlogAgeMetric.tags).containsExactly(
entry("cluster", CLUSTER_NAME),
entry("namespace", namespace),
entry("topic", topic1));
assertThat((long) backlogAgeMetric.value).isEqualTo(0);

// producer should create success.
Producer<byte[]> producer2 = createProducer(client, topic1);
assertNotNull(producer2);
}
}

@Test
public void backlogsStatsPreciseWithNoBacklog() throws PulsarAdminException, PulsarClientException, InterruptedException {
config.setPreciseTimeBasedBacklogQuotaCheck(true);
config.setExposePreciseBacklogInPrometheus(true);
final String namespace = "prop/ns-quota";
assertEquals(admin.namespaces().getBacklogQuotaMap(namespace), new HashMap<>());
final int timeLimitSeconds = 2;
admin.namespaces().setBacklogQuota(
namespace,
BacklogQuota.builder()
.limitTime(timeLimitSeconds)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build(),
message_age);

try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
.maxBackoffInterval(5, SECONDS)
.statsInterval(0, SECONDS).build()) {
final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID();

final String subName1 = "c1";
final int numMsgs = 4;

Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1)
.acknowledgmentGroupTime(0, SECONDS)
.subscribe();
Producer<byte[]> producer = createProducer(client, topic1);

byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
MessageId send = producer.send(content);
System.out.println(i + ":msg:" + MILLISECONDS.toSeconds(System.currentTimeMillis()));
}

String c1MarkDeletePositionBefore =
admin.topics().getInternalStats(topic1).cursors.get(subName1).markDeletePosition;

// Move subscription 1 to end
for (int i = 0; i < numMsgs; i++) {
Message<byte[]> message1 = consumer1.receive();
consumer1.acknowledge(message1);
}

// This code will wait about 4~5 Seconds, to make sure the oldest message is 4~5 seconds old
c1MarkDeletePositionBefore = waitForMarkDeletePositionToChange(topic1, subName1, c1MarkDeletePositionBefore);
waitForQuotaCheckToRunTwice();

Metrics metrics = prometheusMetricsClient.getMetrics();
TopicStats topicStats = getTopicStats(topic1);

assertThat(topicStats.getBacklogQuotaLimitTime()).isEqualTo(timeLimitSeconds);
assertThat(topicStats.getBacklogSize()).isEqualTo(0);
assertThat(topicStats.getSubscriptions().get(subName1).getMsgBacklog()).isEqualTo(0);
assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isEqualTo(0);
assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isNull();

Metric backlogAgeMetric =
metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds",
Pair.of("topic", topic1));
assertThat(backlogAgeMetric.tags).containsExactly(
entry("cluster", CLUSTER_NAME),
entry("namespace", namespace),
entry("topic", topic1));
assertThat((long) backlogAgeMetric.value).isEqualTo(0);

// producer should create success.
Producer<byte[]> producer2 = createProducer(client, topic1);
assertNotNull(producer2);
}
config.setPreciseTimeBasedBacklogQuotaCheck(false);
config.setExposePreciseBacklogInPrometheus(false);
}

private long getReadEntries(String topic1) {
return ((PersistentTopic) pulsar.getBrokerService().getTopicReference(topic1).get())
.getManagedLedger().getStats().getEntriesReadTotalCount();
Expand Down Expand Up @@ -609,6 +722,71 @@ public void backlogsStatsNotPrecise() throws PulsarAdminException, PulsarClientE
}
}

@Test
public void backlogsStatsNotPreciseWithNoBacklog() throws PulsarAdminException, PulsarClientException, InterruptedException {
config.setPreciseTimeBasedBacklogQuotaCheck(false);
config.setExposePreciseBacklogInPrometheus(false);
config.setManagedLedgerMaxEntriesPerLedger(6);
final String namespace = "prop/ns-quota";
assertEquals(admin.namespaces().getBacklogQuotaMap(namespace), new HashMap<>());
final int timeLimitSeconds = 2;
admin.namespaces().setBacklogQuota(
namespace,
BacklogQuota.builder()
.limitTime(timeLimitSeconds)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build(),
message_age);

try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
.maxBackoffInterval(3, SECONDS)
.statsInterval(0, SECONDS).build()) {
final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID();

final String subName1 = "brandNewC1";
final int numMsgs = 5;

Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1)
.acknowledgmentGroupTime(0, SECONDS)
.isAckReceiptEnabled(true)
.subscribe();
Producer<byte[]> producer = createProducer(client, topic1);

byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
producer.send(content);
}

String c1MarkDeletePositionBefore =
admin.topics().getInternalStats(topic1).cursors.get(subName1).markDeletePosition;

log.info("Moved subscription 1 to end");
for (int i = 0; i < numMsgs; i++) {
consumer1.acknowledge(consumer1.receive());
}

c1MarkDeletePositionBefore = waitForMarkDeletePositionToChange(topic1, subName1, c1MarkDeletePositionBefore);
waitForQuotaCheckToRunTwice();

// backlog and backlogAceSeconds should be 0
TopicStats topicStats = getTopicStats(topic1, false);
Metrics metrics = prometheusMetricsClient.getMetrics();
assertEquals(topicStats.getSubscriptions().get(subName1).getMsgBacklog(), 0);
assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isNull();
assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isEqualTo(0);
Metric backlogAgeMetric =
metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds",
Pair.of("topic", topic1));
assertThat(backlogAgeMetric.value).isEqualTo(0);

// producer should create success.
Producer<byte[]> producer2 = createProducer(client, topic1);
assertNotNull(producer2);

config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER);
}
}

private void unloadAndLoadTopic(String topic, Producer producer) throws PulsarAdminException,
PulsarClientException {
admin.topics().unload(topic);
Expand Down

0 comments on commit 6831231

Please sign in to comment.