diff --git a/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp b/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp index 5ef17eef3..4100c8023 100644 --- a/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp +++ b/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp @@ -21,7 +21,7 @@ #include #include #include -#include +#include #include #include @@ -52,17 +52,19 @@ namespace { Mutex* mutex; int commandId; - std::map >* map; + HashMap >* map; public: - ResponseFinalizer(Mutex* mutex, int commandId, std::map >* map) : + ResponseFinalizer(Mutex* mutex, int commandId, HashMap >* map) : mutex(mutex), commandId(commandId), map(map) { } ~ResponseFinalizer() { synchronized(mutex){ - map->erase(commandId); + try { + map->remove(commandId); + } catch (...) {} } } }; @@ -80,7 +82,7 @@ namespace correlator{ decaf::util::concurrent::atomic::AtomicInteger nextCommandId; // Map of request ids to future response objects. - std::map > requestMap; + HashMap > requestMap; // Sync object for accessing the request map. decaf::util::concurrent::Mutex mapMutex; @@ -148,8 +150,7 @@ Pointer ResponseCorrelator::asyncRequest(const Pointer synchronized(&this->impl->mapMutex) { priorError = this->impl->priorError; if (priorError == NULL) { - this->impl->requestMap.insert( - make_pair((unsigned int) command->getCommandId(), futureResponse)); + this->impl->requestMap.put((unsigned int) command->getCommandId(), futureResponse); } } @@ -169,7 +170,7 @@ Pointer ResponseCorrelator::asyncRequest(const Pointer next->oneway(command); } catch (Exception &ex) { // We have to ensure this gets cleaned out otherwise we can consume memory over time. - this->impl->requestMap.erase(command->getCommandId()); + this->impl->requestMap.remove(command->getCommandId()); throw; } @@ -199,8 +200,7 @@ Pointer ResponseCorrelator::request(const Pointer command) { synchronized(&this->impl->mapMutex) { priorError = this->impl->priorError; if (priorError == NULL) { - this->impl->requestMap.insert( - make_pair((unsigned int) command->getCommandId(), futureResponse)); + this->impl->requestMap.put((unsigned int) command->getCommandId(), futureResponse); } } @@ -251,8 +251,7 @@ Pointer ResponseCorrelator::request(const Pointer command, un synchronized(&this->impl->mapMutex) { priorError = this->impl->priorError; if (priorError == NULL) { - this->impl->requestMap.insert( - make_pair((unsigned int) command->getCommandId(), futureResponse)); + this->impl->requestMap.put((unsigned int) command->getCommandId(), futureResponse); } } @@ -301,16 +300,13 @@ void ResponseCorrelator::onCommand(const Pointer command) { // It is a response - let's correlate ... synchronized(&this->impl->mapMutex) { - // Look the future request up based on the correlation id. - std::map >::iterator iter = - this->impl->requestMap.find(response->getCorrelationId()); - if (iter == this->impl->requestMap.end()) { + Pointer futureResponse; + try { + futureResponse = this->impl->requestMap.remove(response->getCorrelationId()); + } catch (NoSuchElementException& ex) { return; } - // Get the future response (if it's in the map, it's not NULL). - Pointer futureResponse = iter->second; - // Set the response property in the future response. futureResponse->setResponse(response); } @@ -340,10 +336,7 @@ void ResponseCorrelator::dispose(Pointer error) { if (this->impl->priorError == NULL) { this->impl->priorError = error; requests.ensureCapacity((int)this->impl->requestMap.size()); - std::map >::iterator iter = this->impl->requestMap.begin(); - for (; iter != this->impl->requestMap.end(); ++iter) { - requests.add(iter->second); - } + requests.copy(this->impl->requestMap.values()); this->impl->requestMap.clear(); } } diff --git a/activemq-cpp/src/test-integration/TestRegistry.cpp b/activemq-cpp/src/test-integration/TestRegistry.cpp index 2a2b595b7..d0e4bc6db 100644 --- a/activemq-cpp/src/test-integration/TestRegistry.cpp +++ b/activemq-cpp/src/test-integration/TestRegistry.cpp @@ -55,7 +55,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireAdvisorysTest CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireAsyncSenderTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireClientAckTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireCmsConnectionStartStopTest ); -//CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenWireCmsSendWithAsyncCallbackTest ); +CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenWireCmsSendWithAsyncCallbackTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireCmsTemplateTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireDurableTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireExpirationTest );