Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ public void run() {
LOG.info("Received: " + msg);
Assert.assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
proxy.close();
}

@Test
Expand Down Expand Up @@ -857,6 +858,7 @@ public void testAutoRollbackWithMissingRedeliveries() throws Exception {
broker.stop();
broker = createBroker();
broker.start();

Assert.assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
} finally {
connection.close();
Expand All @@ -872,45 +874,57 @@ public void testWaitForMissingRedeliveries() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME);
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(destination);
try {
connection.start();
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME);
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(destination);

produceMessage(producerSession, destination);
Message msg = consumer.receive(20000);
if (msg == null) {
AutoFailTestSupport.dumpAllThreads("missing-");
}
Assert.assertNotNull("got message just produced", msg);
produceMessage(producerSession, destination);
Message msg = consumer.receive(20000);
if (msg == null) {
AutoFailTestSupport.dumpAllThreads("missing-");
}
Assert.assertNotNull("got message just produced", msg);

broker.stop();
broker = createBroker();
broker.start();
broker.stop();
broker = createBroker();
broker.start();

final CountDownLatch commitDone = new CountDownLatch(1);
// will block pending re-deliveries
new Thread() {
@Override
public void run() {
LOG.info("doing async commit...");
try {
consumerSession.commit();
commitDone.countDown();
} catch (JMSException ignored) {
final CountDownLatch commitDone = new CountDownLatch(1);
final CountDownLatch gotException = new CountDownLatch(1);
// will block pending re-deliveries
new Thread() {
@Override
public void run() {
LOG.info("doing async commit...");
try {
consumerSession.commit();
commitDone.countDown();
}
catch (JMSException ignored) {
System.out.println("--->err: got exfeption:");
ignored.printStackTrace();
gotException.countDown();
}
finally {
commitDone.countDown();
}
}
}
}.start();
}.start();

broker.stop();
broker = createBroker();
broker.start();
broker.stop();
broker = createBroker();
broker.start();

Assert.assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS));
Assert.assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS));
Assert.assertTrue("got exception on commit", gotException.await(30, TimeUnit.SECONDS));

Assert.assertNull("should not get committed message", consumer.receive(5000));
connection.close();
Assert.assertNotNull("should get failed committed message", consumer.receive(5000));
} finally {
connection.close();
}
}

@Test
Expand Down