Skip to content
Permalink
Browse files
NO-JIRA Making AmqpFlowControlFailTest more accurate
this test was relying on internal details such as number of credits on the link.
The test was flaky and eventually failing or hunging.
  • Loading branch information
clebertsuconic committed Apr 7, 2022
1 parent e774e4f commit bad1c2658278b716d9266a90d9e4993733ab3b1f
Showing 2 changed files with 30 additions and 9 deletions.
@@ -1846,13 +1846,32 @@ private final class ExpiryReaper extends ActiveMQScheduledComponent {
super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand);
}

volatile CountDownLatch inUseLatch;


@Override
public void stop() {
super.stop();
// this will do a best effort to stop the current latch.
// no big deal if it failed. this is just to optimize this component stop.
CountDownLatch latch = inUseLatch;
if (latch != null) {
latch.countDown();
}
}


@Override
public void run() {
// The reaper thread should be finished case the PostOffice is gone
// This is to avoid leaks on PostOffice between stops and starts
for (Queue queue : iterableOf(getLocalQueues())) {
if (!isStarted()) {
break;
}
try {
CountDownLatch latch = new CountDownLatch(1);
this.inUseLatch = latch;
queue.expireReferences(latch::countDown);
// the idea is in fact to block the Reaper while the Queue is executing reaping.
// This would avoid another eventual expiry to be called if the period for reaping is too small
@@ -19,13 +19,13 @@
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
@@ -35,6 +35,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
@@ -77,10 +78,10 @@ protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
}


@Test(timeout = 60000)
@Test(timeout = 10_000)
public void testAddressFullDisposition() throws Exception {
AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
AmqpConnection connection = addConnection(client.connect());
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName(), null, null, outcomes);
@@ -95,6 +96,7 @@ public void testAddressFullDisposition() throws Exception {
rejected = true;
assertTrue(String.format("Unexpected message expected %s to contain %s", e.getMessage(), expectedMessage),
e.getMessage().contains(expectedMessage));
break;
}
}

@@ -112,10 +114,10 @@ protected void configureAddressPolicy(ActiveMQServer server) {
AmqpFlowControlFailTest.configureAddressPolicy(server);
}

@Test(timeout = 60000)
@Test
public void testMesagesNotSent() throws Exception {
AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
AmqpConnection connection = addConnection(client.connect());
AmqpConnection connection = client.connect();
int messagesSent = 0;
try {
AmqpSession session = connection.createSession();
@@ -130,22 +132,22 @@ public void testMesagesNotSent() throws Exception {
messagesSent++;
} catch (IOException e) {
rejected = true;
break;
}
}
assertTrue(rejected);
rejected = false;
assertEquals(0, sender.getSender().getCredit());
AmqpSession session2 = connection.createSession();
AmqpReceiver receiver = session2.createReceiver(getQueueName());
receiver.flow(messagesSent);
for (int i = 0; i < messagesSent; i++) {
AmqpMessage receive = receiver.receive();
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(receive);
receive.accept();
}
receiver.close();
session2.close();

Wait.assertEquals(1000, sender.getSender()::getCredit);
for (int i = 0; i < 1000; i++) {
final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[100];
@@ -154,10 +156,10 @@ public void testMesagesNotSent() throws Exception {
sender.send(message);
} catch (IOException e) {
rejected = true;
break;
}
}
assertTrue(rejected);
assertEquals(0, sender.getSender().getCredit());
} finally {
connection.close();
}

0 comments on commit bad1c26

Please sign in to comment.