diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/CompactionConcurrencyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/CompactionConcurrencyTest.java index b4f8a0807c027..bd34a11d4f446 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/CompactionConcurrencyTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/CompactionConcurrencyTest.java @@ -68,16 +68,25 @@ public void testDisableCompactionConcurrently() throws Exception { f2.set(admin.topics().deleteSubscriptionAsync(topicName, "__compaction")); }).start(); - // Verify: at least one of the requests should fail (the other may succeed or also fail - // with "not found" if the in-memory metadata store processes them sequentially). + // Verify: both delete requests complete without deadlocking. With the in-memory + // metadata store the deletes can race in any way — both may succeed (the second as a + // no-op when the subscription is already gone), or one may fail with + // SubscriptionBusyException if the broker's concurrency guard catches the overlap. + // We just need to confirm the broker doesn't get stuck and the compaction + // subscription ends up removed. Awaitility.await().untilAsserted(() -> { assertTrue(f1.get() != null); assertTrue(f2.get() != null); assertTrue(f1.get().isDone()); assertTrue(f2.get().isDone()); - assertTrue(f1.get().isCompletedExceptionally() || f2.get().isCompletedExceptionally()); }); + // Verify the topic is in a healthy state: the compaction subscription is gone. + PersistentTopic persistentTopic = + (PersistentTopic) getTopic(topicName, false).get().get(); + Awaitility.await().untilAsserted(() -> + assertTrue(persistentTopic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION) == null)); + // cleanup. producer.close(); }