Skip to content
Browse files

https://issues.apache.org/jira/browse/AMQCPP-532

Add test case to try and reproduce.
  • Loading branch information...
1 parent 854482b commit ec8af34d7733a620c1dc4e269e1b2f13716dedf8 @tabish121 tabish121 committed Jan 27, 2014
View
154 activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp
@@ -47,67 +47,66 @@ QueueBrowserTest::~QueueBrowserTest() {
////////////////////////////////////////////////////////////////////////////////
void QueueBrowserTest::testReceiveBrowseReceive() {
- cms::Session* session( cmsProvider->getSession() );
+ cms::Session* session(cmsProvider->getSession());
- std::auto_ptr<cms::Queue> queue( session->createQueue("testReceiveBrowseReceive") );
+ std::auto_ptr<cms::Queue> queue(session->createQueue("testReceiveBrowseReceive"));
- std::auto_ptr<cms::MessageConsumer> consumer( session->createConsumer( queue.get() ) );
- std::auto_ptr<cms::MessageProducer> producer( session->createProducer( queue.get() ) );
+ std::auto_ptr<cms::MessageConsumer> consumer(session->createConsumer(queue.get()));
+ std::auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get()));
- std::auto_ptr<cms::TextMessage> message1( session->createTextMessage( "First Message" ) );
- std::auto_ptr<cms::TextMessage> message2( session->createTextMessage( "Second Message" ) );
- std::auto_ptr<cms::TextMessage> message3( session->createTextMessage( "Third Message" ) );
+ std::auto_ptr<cms::TextMessage> message1(session->createTextMessage("First Message"));
+ std::auto_ptr<cms::TextMessage> message2(session->createTextMessage("Second Message"));
+ std::auto_ptr<cms::TextMessage> message3(session->createTextMessage("Third Message"));
// lets consume any outstanding messages from previous test runs
cms::Message* message;
- while( ( message = consumer->receive( 1000 ) ) != NULL ) {
+ while ((message = consumer->receive(1000)) != NULL) {
delete message;
}
- producer->send( message1.get() );
- producer->send( message2.get() );
- producer->send( message3.get() );
+ producer->send(message1.get());
+ producer->send(message2.get());
+ producer->send(message3.get());
// Get the first.
- std::auto_ptr<cms::TextMessage> inbound(
- dynamic_cast<cms::TextMessage*>( consumer->receive( 1000 ) ) );
- CPPUNIT_ASSERT( inbound.get() != NULL );
- CPPUNIT_ASSERT_EQUAL( message1->getText(), inbound->getText() );
+ std::auto_ptr<cms::TextMessage> inbound(dynamic_cast<cms::TextMessage*>(consumer->receive(1000)));
+ CPPUNIT_ASSERT(inbound.get() != NULL);
+ CPPUNIT_ASSERT_EQUAL(message1->getText(), inbound->getText());
consumer->close();
- std::auto_ptr<cms::QueueBrowser> browser( session->createBrowser( queue.get() ) );
+ std::auto_ptr<cms::QueueBrowser> browser(session->createBrowser(queue.get()));
cms::MessageEnumeration* enumeration = browser->getEnumeration();
// browse the second
- CPPUNIT_ASSERT_MESSAGE( "should have received the second message", enumeration->hasMoreMessages() );
- inbound.reset( dynamic_cast<cms::TextMessage*>( enumeration->nextMessage() ) );
- CPPUNIT_ASSERT( inbound.get() != NULL );
- CPPUNIT_ASSERT_EQUAL( message2->getText(), inbound->getText() );
+ CPPUNIT_ASSERT_MESSAGE("should have received the second message", enumeration->hasMoreMessages());
+ inbound.reset(dynamic_cast<cms::TextMessage*>(enumeration->nextMessage()));
+ CPPUNIT_ASSERT(inbound.get() != NULL);
+ CPPUNIT_ASSERT_EQUAL(message2->getText(), inbound->getText());
// browse the third.
- CPPUNIT_ASSERT_MESSAGE( "should have received the third message", enumeration->hasMoreMessages() );
- inbound.reset( dynamic_cast<cms::TextMessage*>( enumeration->nextMessage() ) );
- CPPUNIT_ASSERT( inbound.get() != NULL );
- CPPUNIT_ASSERT_EQUAL( message3->getText(), inbound->getText() );
+ CPPUNIT_ASSERT_MESSAGE("should have received the third message", enumeration->hasMoreMessages());
+ inbound.reset(dynamic_cast<cms::TextMessage*>(enumeration->nextMessage()));
+ CPPUNIT_ASSERT(inbound.get() != NULL);
+ CPPUNIT_ASSERT_EQUAL(message3->getText(), inbound->getText());
// There should be no more.
bool tooMany = false;
- while( enumeration->hasMoreMessages() ) {
+ while (enumeration->hasMoreMessages()) {
tooMany = true;
}
- CPPUNIT_ASSERT_MESSAGE( "Should not have browsed any more messages", !tooMany );
+ CPPUNIT_ASSERT_MESSAGE("Should not have browsed any more messages", !tooMany);
browser->close();
// Re-open the consumer
- consumer.reset( session->createConsumer( queue.get() ) );
+ consumer.reset(session->createConsumer(queue.get()));
// Receive the second.
- inbound.reset( dynamic_cast<cms::TextMessage*>( consumer->receive( 1000 ) ) );
- CPPUNIT_ASSERT( inbound.get() != NULL );
- CPPUNIT_ASSERT_EQUAL( message2->getText(), inbound->getText() );
+ inbound.reset(dynamic_cast<cms::TextMessage*>(consumer->receive(1000)));
+ CPPUNIT_ASSERT(inbound.get() != NULL);
+ CPPUNIT_ASSERT_EQUAL(message2->getText(), inbound->getText());
// Receive the third.
- inbound.reset( dynamic_cast<cms::TextMessage*>( consumer->receive( 1000 ) ) );
- CPPUNIT_ASSERT( inbound.get() != NULL );
- CPPUNIT_ASSERT_EQUAL( message3->getText(), inbound->getText() );
+ inbound.reset(dynamic_cast<cms::TextMessage*>(consumer->receive(1000)));
+ CPPUNIT_ASSERT(inbound.get() != NULL);
+ CPPUNIT_ASSERT_EQUAL(message3->getText(), inbound->getText());
consumer->close();
browser->close();
@@ -120,33 +119,30 @@ void QueueBrowserTest::testBrowseReceive() {
std::auto_ptr<cms::TextMessage> inbound;
- cms::Session* session( cmsProvider->getSession() );
-
- std::auto_ptr<cms::Queue> queue( session->createQueue("testBrowseReceive") );
-
- std::auto_ptr<cms::TextMessage> message1( session->createTextMessage( "First Message" ) );
-
- std::auto_ptr<cms::MessageProducer> producer( session->createProducer( queue.get() ) );
+ cms::Session* session(cmsProvider->getSession());
+ std::auto_ptr<cms::Queue> queue(session->createQueue("testBrowseReceive"));
+ std::auto_ptr<cms::TextMessage> message1(session->createTextMessage("First Message"));
+ std::auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get()));
- producer->send( message1.get() );
+ producer->send(message1.get());
// create browser first
- std::auto_ptr<cms::QueueBrowser> browser( session->createBrowser( queue.get() ) );
+ std::auto_ptr<cms::QueueBrowser> browser(session->createBrowser(queue.get()));
cms::MessageEnumeration* enumeration = browser->getEnumeration();
// create consumer
- std::auto_ptr<cms::MessageConsumer> consumer( session->createConsumer( queue.get() ) );
+ std::auto_ptr<cms::MessageConsumer> consumer(session->createConsumer(queue.get()));
// browse the first message
- CPPUNIT_ASSERT_MESSAGE( "should have received the first message", enumeration->hasMoreMessages() );
- inbound.reset( dynamic_cast<cms::TextMessage*>( enumeration->nextMessage() ) );
- CPPUNIT_ASSERT( inbound.get() != NULL );
- CPPUNIT_ASSERT_EQUAL( message1->getText(), inbound->getText() );
+ CPPUNIT_ASSERT_MESSAGE("should have received the first message", enumeration->hasMoreMessages());
+ inbound.reset(dynamic_cast<cms::TextMessage*>(enumeration->nextMessage()));
+ CPPUNIT_ASSERT(inbound.get() != NULL);
+ CPPUNIT_ASSERT_EQUAL(message1->getText(), inbound->getText());
// Receive the first message.
- inbound.reset( dynamic_cast<cms::TextMessage*>( consumer->receive( 1000 ) ) );
- CPPUNIT_ASSERT( inbound.get() != NULL );
- CPPUNIT_ASSERT_EQUAL( message1->getText(), inbound->getText() );
+ inbound.reset(dynamic_cast<cms::TextMessage*>(consumer->receive(1000)));
+ CPPUNIT_ASSERT(inbound.get() != NULL);
+ CPPUNIT_ASSERT_EQUAL(message1->getText(), inbound->getText());
consumer->close();
browser->close();
@@ -158,17 +154,17 @@ void QueueBrowserTest::testQueueBrowserWith2Consumers() {
static const int numMessages = 100;
- ActiveMQConnection* connection = dynamic_cast<ActiveMQConnection*> (cmsProvider->getConnection());
- CPPUNIT_ASSERT( connection != NULL );
+ ActiveMQConnection* connection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection());
+ CPPUNIT_ASSERT(connection != NULL);
connection->setAlwaysSyncSend(false);
std::auto_ptr<cms::Session> session(connection->createSession(cms::Session::CLIENT_ACKNOWLEDGE));
std::auto_ptr<cms::Queue> queue(session->createQueue("testQueueBrowserWith2Consumers"));
std::auto_ptr<cms::Queue> queuePrefetch10(
- session->createQueue("testQueueBrowserWith2Consumers?consumer.prefetchSize=10") );
+ session->createQueue("testQueueBrowserWith2Consumers?consumer.prefetchSize=10"));
std::auto_ptr<cms::Queue> queuePrefetch1(
- session->createQueue("testQueueBrowserWith2Consumers?consumer.prefetchSize=1") );
+ session->createQueue("testQueueBrowserWith2Consumers?consumer.prefetchSize=1"));
std::auto_ptr<ActiveMQConnectionFactory> factory(new ActiveMQConnectionFactory(cmsProvider->getBrokerURL()));
std::auto_ptr<ActiveMQConnection> connection2(dynamic_cast<ActiveMQConnection*>(factory->createConnection()));
@@ -183,34 +179,66 @@ void QueueBrowserTest::testQueueBrowserWith2Consumers() {
for (int i = 0; i < numMessages; i++) {
std::auto_ptr<cms::TextMessage> message(
- session->createTextMessage(std::string("Message: ") + Integer::toString(i)));
+ session->createTextMessage(std::string("Message: ") + Integer::toString(i)));
producer->send(message.get());
}
- std::auto_ptr<cms::QueueBrowser> browser( session2->createBrowser( queuePrefetch1.get() ) );
+ std::auto_ptr<cms::QueueBrowser> browser(session2->createBrowser(queuePrefetch1.get()));
cms::MessageEnumeration* browserView = browser->getEnumeration();
std::vector<cms::Message*> messages;
for (int i = 0; i < numMessages; i++) {
- cms::Message* m1 = consumer->receive( 5000 );
- CPPUNIT_ASSERT_MESSAGE( std::string( "m1 is null for index: " ) + Integer::toString( i ), m1 != NULL );
- messages.push_back( m1 );
+ cms::Message* m1 = consumer->receive(5000);
+ CPPUNIT_ASSERT_MESSAGE(std::string("m1 is null for index: ") + Integer::toString(i), m1 != NULL);
+ messages.push_back(m1);
}
for (int i = 0; i < numMessages && browserView->hasMoreMessages(); i++) {
cms::Message* m1 = messages[i];
cms::Message* m2 = browserView->nextMessage();
- CPPUNIT_ASSERT_MESSAGE( std::string( "m2 is null for index: " ) + Integer::toString( i ), m2 != NULL );
- CPPUNIT_ASSERT( m1->getCMSMessageID() == m2->getCMSMessageID() );
+ CPPUNIT_ASSERT_MESSAGE(std::string("m2 is null for index: ") + Integer::toString(i), m2 != NULL);
+ CPPUNIT_ASSERT(m1->getCMSMessageID() == m2->getCMSMessageID());
delete m2;
}
- CPPUNIT_ASSERT_MESSAGE( "nothing left in the browser", !browserView->hasMoreMessages() );
- CPPUNIT_ASSERT_MESSAGE( "consumer finished", consumer->receiveNoWait() == NULL );
+ CPPUNIT_ASSERT_MESSAGE("nothing left in the browser", !browserView->hasMoreMessages());
+ CPPUNIT_ASSERT_MESSAGE("consumer finished", consumer->receiveNoWait() == NULL);
for (std::size_t ix = 0; ix < messages.size(); ++ix) {
cms::Message* msg = messages[ix];
msg->acknowledge();
delete msg;
}
}
+
+////////////////////////////////////////////////////////////////////////////////
+void QueueBrowserTest::testRepeatedQueueBrowserCreateDestroy() {
+
+ ActiveMQConnection* connection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection());
+ CPPUNIT_ASSERT(connection != NULL);
+
+ std::auto_ptr<cms::Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED));
+ std::auto_ptr<cms::Queue> queue(session->createTemporaryQueue());
+
+ std::auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get()));
+ std::auto_ptr<cms::TextMessage> textMessage(session->createTextMessage("Test"));
+ producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
+ producer->send(textMessage.get());
+ session->commit();
+
+ connection->start();
+
+ std::auto_ptr<cms::QueueBrowser> browser(session->createBrowser(queue.get()));
+
+ for (int i = 0; i < 200; i++) {
+ browser.reset(session->createBrowser(queue.get()));
+ cms::MessageEnumeration* browserView = browser->getEnumeration();
+
+ if (browserView->hasMoreMessages()) {
+ std::auto_ptr<cms::Message> message(browserView->nextMessage());
+ CPPUNIT_ASSERT(message.get() != NULL);
+ }
+
+ browser.reset(NULL);
+ }
+}
View
1 activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h
@@ -33,6 +33,7 @@ namespace test {
void testReceiveBrowseReceive();
void testBrowseReceive();
void testQueueBrowserWith2Consumers();
+ void testRepeatedQueueBrowserCreateDestroy();
};
View
1 activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
@@ -31,6 +31,7 @@ namespace openwire {
CPPUNIT_TEST( testReceiveBrowseReceive );
CPPUNIT_TEST( testBrowseReceive );
CPPUNIT_TEST( testQueueBrowserWith2Consumers );
+ CPPUNIT_TEST( testRepeatedQueueBrowserCreateDestroy );
CPPUNIT_TEST_SUITE_END();
public:

0 comments on commit ec8af34

Please sign in to comment.
Something went wrong with that request. Please try again.