Skip to content

Commit

Permalink
NO-JIRA making MirroredSubscriptionTest more challenging
Browse files Browse the repository at this point in the history
The test is now setting the mirror to sync
it will block until the first subscription is consumed, kill the servers and restart them
check all the counters

and then start another 4 consumers and at the end check all the counters.

Mirror is now sync making the test more useful and challenging.
  • Loading branch information
clebertsuconic committed Jul 21, 2023
1 parent c50d97d commit 04f29e0
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ under the License.

<broker-connections>
<amqp-connection uri="tcp://localhost:61617" name="mirror" retry-interval="100">
<mirror/>
<mirror sync="true"/>
</amqp-connection>
</broker-connections>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public class MirroredSubscriptionTest extends SmokeTestBase {
public void beforeClass() throws Exception {
cleanupData(SERVER_NAME_A);
cleanupData(SERVER_NAME_B);
startServers();
}

private void startServers() throws Exception {
processB = startServer(SERVER_NAME_B, 1, 0);
processA = startServer(SERVER_NAME_A, 0, 0);

Expand All @@ -64,10 +68,9 @@ public void beforeClass() throws Exception {
}

@Test
public void testSend() throws Throwable {

public void testConsumeAll() throws Throwable {
int COMMIT_INTERVAL = 100;
int NUMBER_OF_MESSAGES = 1000;
int NUMBER_OF_MESSAGES = 300;
int CLIENTS = 5;
String mainURI = "tcp://localhost:61616";
String secondURI = "tcp://localhost:61617";
Expand Down Expand Up @@ -111,6 +114,7 @@ public void testSend() throws Throwable {

for (int i = 0; i < CLIENTS; i++) {
final int clientID = i;
CountDownLatch threadDone = new CountDownLatch(1);
executorService.execute(() -> {
try (Connection connection = cf.createConnection()) {
connection.setClientID("client" + clientID);
Expand All @@ -132,8 +136,27 @@ public void testSend() throws Throwable {
errors.incrementAndGet();
} finally {
done.countDown();
threadDone.countDown();
}
});

if (clientID == 0) {
// The first execution will block until finished, we will then kill all the servers and make sure
// all the counters are preserved.
Assert.assertTrue(threadDone.await(300, TimeUnit.SECONDS));
processA.destroyForcibly();
processB.destroyForcibly();
Wait.assertFalse(processA::isAlive);
Wait.assertFalse(processB::isAlive);
startServers();
Wait.assertEquals(0, () -> getMessageCount(mainURI, "client0.subscription0"));
Wait.assertEquals(0, () -> getMessageCount(secondURI, "client0.subscription0"));
for (int checkID = 1; checkID < CLIENTS; checkID++) {
int checkFinal = checkID;
Wait.assertEquals(NUMBER_OF_MESSAGES, () -> getMessageCount(mainURI, "client" + checkFinal + ".subscription" + checkFinal), 2000, 100);
Wait.assertEquals(NUMBER_OF_MESSAGES, () -> getMessageCount(secondURI, "client" + checkFinal + ".subscription" + checkFinal), 2000, 100);
}
}
}

Assert.assertTrue(done.await(300, TimeUnit.SECONDS));
Expand Down

0 comments on commit 04f29e0

Please sign in to comment.