3131import com .google .common .collect .Sets ;
3232import io .opentelemetry .api .common .Attributes ;
3333import java .net .URL ;
34+ import java .time .Duration ;
3435import java .util .HashSet ;
3536import java .util .Optional ;
3637import java .util .Set ;
3738import java .util .concurrent .CountDownLatch ;
39+ import java .util .concurrent .CyclicBarrier ;
3840import java .util .concurrent .ExecutorService ;
3941import java .util .concurrent .Executors ;
4042import java .util .concurrent .LinkedBlockingQueue ;
4143import java .util .concurrent .ThreadPoolExecutor ;
4244import java .util .concurrent .TimeUnit ;
4345import java .util .concurrent .atomic .AtomicBoolean ;
44- import java .util .concurrent .atomic .AtomicInteger ;
46+ import java .util .concurrent .atomic .AtomicReference ;
4547import lombok .Cleanup ;
4648import org .apache .pulsar .broker .BrokerTestUtil ;
4749import org .apache .pulsar .broker .PulsarService ;
@@ -847,14 +849,18 @@ public void testNonPersistentBrokerModeRejectPersistentTopic(String loadManagerN
847849 *
848850 * @throws Exception
849851 */
850- @ Test
852+ @ Test ( timeOut = 60000 )
851853 public void testMsgDropStat () throws Exception {
852854
853855 int defaultNonPersistentMessageRate = conf .getMaxConcurrentNonPersistentMessagePerConnection ();
854856 try {
855857 final String topicName = BrokerTestUtil .newUniqueName ("non-persistent://my-property/my-ns/stats-topic" );
856- // restart broker with lower publish rate limit
857- conf .setMaxConcurrentNonPersistentMessagePerConnection (1 );
858+
859+ // For non-persistent topics, set the per-connection in-flight limit to 0.
860+ // Since ServerCnx drops when inFlight > max; with max=0, any second overlapping send on the
861+ // same connection is dropped (entryId == -1) and recorded. This makes observing a publisher drop
862+ // reliable in this test.
863+ conf .setMaxConcurrentNonPersistentMessagePerConnection (0 );
858864 stopBroker ();
859865 startBroker ();
860866
@@ -873,36 +879,55 @@ public void testMsgDropStat() throws Exception {
873879 .enableBatching (false )
874880 .messageRoutingMode (MessageRoutingMode .SinglePartition )
875881 .create ();
882+
883+ final int threads = 10 ;
876884 @ Cleanup ("shutdownNow" )
877- ExecutorService executor = Executors .newFixedThreadPool (10 );
885+ ExecutorService executor = Executors .newFixedThreadPool (threads );
878886 byte [] msgData = "testData" .getBytes ();
879- final int totalProduceMessages = 1000 ;
880- CountDownLatch latch = new CountDownLatch (1 );
881- AtomicInteger messagesSent = new AtomicInteger (0 );
882- for (int i = 0 ; i < totalProduceMessages ; i ++) {
883- executor .submit (() -> {
884- try {
885- MessageId msgId = producer .send (msgData );
886- int count = messagesSent .incrementAndGet ();
887- // process at least 20% of messages before signalling the latch
888- // a non-persistent message will return entryId as -1 when it has been dropped
889- // due to setMaxConcurrentNonPersistentMessagePerConnection limit
890- // also ensure that it has happened before the latch is signalled
891- if (count > totalProduceMessages * 0.2 && msgId != null
892- && ((MessageIdImpl ) msgId ).getEntryId () == -1 ) {
893- latch .countDown ();
887+
888+ /*
889+ * Trigger at least one publisher drop through concurrent send() calls.
890+ *
891+ * Uses CyclicBarrier to ensure all threads send simultaneously, creating overlap.
892+ * With maxConcurrentNonPersistentMessagePerConnection = 0, ServerCnx#handleSend
893+ * drops any send while another is in-flight, returning MessageId with entryId = -1.
894+ * Awaitility repeats whole bursts (bounded to 20s) until a drop is observed.
895+ */
896+ AtomicBoolean publisherDropSeen = new AtomicBoolean (false );
897+ Awaitility .await ().atMost (Duration .ofSeconds (20 )).until (() -> {
898+ CyclicBarrier barrier = new CyclicBarrier (threads );
899+ CountDownLatch completionLatch = new CountDownLatch (threads );
900+ AtomicReference <Throwable > error = new AtomicReference <>();
901+ publisherDropSeen .set (false );
902+
903+ for (int i = 0 ; i < threads ; i ++) {
904+ executor .submit (() -> {
905+ try {
906+ barrier .await ();
907+ MessageId msgId = producer .send (msgData );
908+ // Publisher drop is signaled by MessageIdImpl.entryId == -1
909+ if (msgId instanceof MessageIdImpl && ((MessageIdImpl ) msgId ).getEntryId () == -1 ) {
910+ publisherDropSeen .set (true );
911+ }
912+ } catch (Throwable t ) {
913+ if (t instanceof InterruptedException ) {
914+ Thread .currentThread ().interrupt ();
915+ }
916+ error .compareAndSet (null , t );
917+ } finally {
918+ completionLatch .countDown ();
894919 }
920+ });
921+ }
895922
896- Thread .sleep (10 );
897- } catch (PulsarClientException e ) {
898- throw new RuntimeException (e );
899- } catch (InterruptedException e ) {
900- Thread .currentThread ().interrupt ();
901- throw new RuntimeException (e );
902- }
903- });
904- }
905- assertTrue (latch .await (5 , TimeUnit .SECONDS ));
923+ // Wait for all sends to complete.
924+ assertTrue (completionLatch .await (20 , TimeUnit .SECONDS ));
925+
926+ assertNull (error .get (), "Concurrent send encountered an exception" );
927+ return publisherDropSeen .get ();
928+ });
929+
930+ assertTrue (publisherDropSeen .get (), "Expected at least one publisher drop (entryId == -1)" );
906931
907932 NonPersistentTopic topic =
908933 (NonPersistentTopic ) pulsar .getBrokerService ().getOrCreateTopic (topicName ).get ();
0 commit comments