Skip to content

Commit

Permalink
Fix potential segfault when using async sends with callback.
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/activemq/activemq-cpp/trunk@1464225 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Timothy A. Bish committed Apr 3, 2013
1 parent 93f0d06 commit 2f343b0
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 24 deletions.
Expand Up @@ -21,7 +21,7 @@
#include <decaf/util/ArrayList.h>
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/util/concurrent/atomic/AtomicInteger.h>
#include <map>
#include <decaf/util/HashMap.h>

#include <activemq/commands/Response.h>
#include <activemq/commands/ExceptionResponse.h>
Expand Down Expand Up @@ -52,17 +52,19 @@ namespace {

Mutex* mutex;
int commandId;
std::map<unsigned int, Pointer<FutureResponse> >* map;
HashMap<unsigned int, Pointer<FutureResponse> >* map;

public:

ResponseFinalizer(Mutex* mutex, int commandId, std::map<unsigned int, Pointer<FutureResponse> >* map) :
ResponseFinalizer(Mutex* mutex, int commandId, HashMap<unsigned int, Pointer<FutureResponse> >* map) :
mutex(mutex), commandId(commandId), map(map) {
}

~ResponseFinalizer() {
synchronized(mutex){
map->erase(commandId);
try {
map->remove(commandId);
} catch (...) {}
}
}
};
Expand All @@ -80,7 +82,7 @@ namespace correlator{
decaf::util::concurrent::atomic::AtomicInteger nextCommandId;

// Map of request ids to future response objects.
std::map<unsigned int, Pointer<FutureResponse> > requestMap;
HashMap<unsigned int, Pointer<FutureResponse> > requestMap;

// Sync object for accessing the request map.
decaf::util::concurrent::Mutex mapMutex;
Expand Down Expand Up @@ -148,8 +150,7 @@ Pointer<FutureResponse> ResponseCorrelator::asyncRequest(const Pointer<Command>
synchronized(&this->impl->mapMutex) {
priorError = this->impl->priorError;
if (priorError == NULL) {
this->impl->requestMap.insert(
make_pair((unsigned int) command->getCommandId(), futureResponse));
this->impl->requestMap.put((unsigned int) command->getCommandId(), futureResponse);
}
}

Expand All @@ -169,7 +170,7 @@ Pointer<FutureResponse> ResponseCorrelator::asyncRequest(const Pointer<Command>
next->oneway(command);
} catch (Exception &ex) {
// We have to ensure this gets cleaned out otherwise we can consume memory over time.
this->impl->requestMap.erase(command->getCommandId());
this->impl->requestMap.remove(command->getCommandId());
throw;
}

Expand Down Expand Up @@ -199,8 +200,7 @@ Pointer<Response> ResponseCorrelator::request(const Pointer<Command> command) {
synchronized(&this->impl->mapMutex) {
priorError = this->impl->priorError;
if (priorError == NULL) {
this->impl->requestMap.insert(
make_pair((unsigned int) command->getCommandId(), futureResponse));
this->impl->requestMap.put((unsigned int) command->getCommandId(), futureResponse);
}
}

Expand Down Expand Up @@ -251,8 +251,7 @@ Pointer<Response> ResponseCorrelator::request(const Pointer<Command> command, un
synchronized(&this->impl->mapMutex) {
priorError = this->impl->priorError;
if (priorError == NULL) {
this->impl->requestMap.insert(
make_pair((unsigned int) command->getCommandId(), futureResponse));
this->impl->requestMap.put((unsigned int) command->getCommandId(), futureResponse);
}
}

Expand Down Expand Up @@ -301,16 +300,13 @@ void ResponseCorrelator::onCommand(const Pointer<Command> command) {
// It is a response - let's correlate ...
synchronized(&this->impl->mapMutex) {

// Look the future request up based on the correlation id.
std::map<unsigned int, Pointer<FutureResponse> >::iterator iter =
this->impl->requestMap.find(response->getCorrelationId());
if (iter == this->impl->requestMap.end()) {
Pointer<FutureResponse> futureResponse;
try {
futureResponse = this->impl->requestMap.remove(response->getCorrelationId());
} catch (NoSuchElementException& ex) {
return;
}

// Get the future response (if it's in the map, it's not NULL).
Pointer<FutureResponse> futureResponse = iter->second;

// Set the response property in the future response.
futureResponse->setResponse(response);
}
Expand Down Expand Up @@ -340,10 +336,7 @@ void ResponseCorrelator::dispose(Pointer<Exception> error) {
if (this->impl->priorError == NULL) {
this->impl->priorError = error;
requests.ensureCapacity((int)this->impl->requestMap.size());
std::map<unsigned int, Pointer<FutureResponse> >::iterator iter = this->impl->requestMap.begin();
for (; iter != this->impl->requestMap.end(); ++iter) {
requests.add(iter->second);
}
requests.copy(this->impl->requestMap.values());
this->impl->requestMap.clear();
}
}
Expand Down
2 changes: 1 addition & 1 deletion activemq-cpp/src/test-integration/TestRegistry.cpp
Expand Up @@ -55,7 +55,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireAdvisorysTest
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireAsyncSenderTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireClientAckTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireCmsConnectionStartStopTest );
//CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenWireCmsSendWithAsyncCallbackTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenWireCmsSendWithAsyncCallbackTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireCmsTemplateTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireDurableTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireExpirationTest );
Expand Down

0 comments on commit 2f343b0

Please sign in to comment.