Skip to content

Commit

Permalink
[fix][flaky-test]ManagedCursorMetricsTest.testCursorReadWriteMetrics (a…
Browse files Browse the repository at this point in the history
…pache#17045)

(cherry picked from commit e0ff3d7)
(cherry picked from commit eece41b)
  • Loading branch information
poorbarcode authored and nicoloboschi committed Sep 16, 2022
1 parent 0fc39b3 commit f8cf23d
Showing 1 changed file with 24 additions and 3 deletions.
Expand Up @@ -19,10 +19,14 @@
package org.apache.pulsar.broker.stats;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -108,9 +112,19 @@ public void testManagedCursorMetrics() throws Exception {
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
}

private ManagedCursorMXBean getManagedCursorMXBean(String topicName, String subscriptionName)
throws ExecutionException, InterruptedException {
final PersistentSubscription persistentSubscription =
(PersistentSubscription) pulsar.getBrokerService()
.getTopic(topicName, false).get().get().getSubscription(subscriptionName);
final ManagedCursorImpl managedCursor = (ManagedCursorImpl) persistentSubscription.getCursor();
return managedCursor.getStats();
}

@Test
public void testCursorReadWriteMetrics() throws Exception {
final String subName = "read-write";
final String subName1 = "read-write-sub-1";
final String subName2 = "read-write-sub-2";
final String topicName = "persistent://my-namespace/use/my-ns/read-write";
final int messageSize = 10;

Expand All @@ -127,15 +141,15 @@ public void testCursorReadWriteMetrics() throws Exception {
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName)
.subscriptionName(subName1)
.subscribe();

@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName + "-2")
.subscriptionName(subName2)
.subscribe();

@Cleanup
Expand All @@ -156,6 +170,13 @@ public void testCursorReadWriteMetrics() throws Exception {
consumer2.acknowledge(consumer.receive().getMessageId());
}
}

// Wait for persistent cursor meta.
ManagedCursorMXBean cursorMXBean1 = getManagedCursorMXBean(topicName, subName1);
ManagedCursorMXBean cursorMXBean2 = getManagedCursorMXBean(topicName, subName2);
Awaitility.await().until(() -> cursorMXBean1.getWriteCursorLedgerLogicalSize() > 0);
Awaitility.await().until(() -> cursorMXBean2.getWriteCursorLedgerLogicalSize() > 0);

metricsList = metrics.generate();
Assert.assertEquals(metricsList.size(), 2);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
Expand Down

0 comments on commit f8cf23d

Please sign in to comment.