diff --git a/apm-agent-plugins/apm-jms-plugin/src/test/java/co/elastic/apm/agent/jms/JmsInstrumentationIT.java b/apm-agent-plugins/apm-jms-plugin/src/test/java/co/elastic/apm/agent/jms/JmsInstrumentationIT.java index 4865ffac58..53af2300b5 100644 --- a/apm-agent-plugins/apm-jms-plugin/src/test/java/co/elastic/apm/agent/jms/JmsInstrumentationIT.java +++ b/apm-agent-plugins/apm-jms-plugin/src/test/java/co/elastic/apm/agent/jms/JmsInstrumentationIT.java @@ -56,6 +56,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; @@ -122,7 +123,7 @@ public void testQueueSendReceiveOnTracedThread() throws Exception { @Test public void testQueueSendReceiveNoWaitOnTracedThread() throws Exception { final Queue queue = brokerFacade.createQueue(TEST_Q_NAME); - testQueueSendReceiveOnTracedThread(() -> brokerFacade.receiveNoWait(queue), queue); + testQueueSendReceiveOnTracedThread(() -> loopReceive(() -> brokerFacade.receiveNoWait(queue), 3000), queue); } @Test @@ -134,7 +135,23 @@ public void testQueueSendReceiveOnNonTracedThread() throws Exception { @Test public void testQueueSendReceiveNoWaitOnNonTracedThread() throws Exception { final Queue queue = brokerFacade.createQueue(TEST_Q_NAME); - testQueueSendReceiveOnNonTracedThread(() -> brokerFacade.receiveNoWait(queue), queue); + testQueueSendReceiveOnNonTracedThread(() -> loopReceive(() -> brokerFacade.receiveNoWait(queue), 3000), queue); + } + + // A utility method for testing the receiveNoWait API consistently + private Message loopReceive(Callable receiveMethod, @SuppressWarnings("SameParameterValue") long timeout) throws Exception { + long start = System.currentTimeMillis(); + long curr = start; + while (curr - start < timeout) { + Message ret = receiveMethod.call(); + if (ret != null) { + return ret; + } else { + Thread.sleep(100); + curr = System.currentTimeMillis(); + } + } + throw new TimeoutException(); } private void testQueueSendReceiveOnTracedThread(Callable receiveMethod, Queue queue) throws Exception {