Permalink
Browse files

work on: https://issues.apache.org/jira/browse/AMQCPP-376

git-svn-id: https://svn.apache.org/repos/asf/activemq/activemq-cpp/trunk@1324807 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent f5ce627 commit ee0810f194e48c858daf41cc3f5c0d69f3405dcf Timothy A. Bish committed Apr 11, 2012
View
228 activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
@@ -40,6 +40,8 @@
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/util/concurrent/TimeUnit.h>
#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/util/concurrent/ThreadPoolExecutor.h>
+#include <decaf/util/concurrent/LinkedBlockingQueue.h>
#include <activemq/commands/Command.h>
#include <activemq/commands/ActiveMQMessage.h>
@@ -107,6 +109,7 @@ namespace core{
Pointer<transport::Transport> transport;
Pointer<util::IdGenerator> clientIdGenerator;
Pointer<Scheduler> scheduler;
+ Pointer<ExecutorService> executor;
util::LongSequenceGenerator sessionIds;
util::LongSequenceGenerator tempDestinationIds;
@@ -192,6 +195,9 @@ namespace core{
this->connectionInfo.reset(new ConnectionInfo());
this->brokerInfoReceived.reset(new CountDownLatch(1));
+ this->executor.reset(
+ new ThreadPoolExecutor(1, 1, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>()));
+
// Generate a connectionId
std::string uniqueId = CONNECTION_ID_GENERATOR.generateId();
decaf::lang::Pointer<ConnectionId> connectionId(new ConnectionId());
@@ -208,6 +214,90 @@ namespace core{
// Static init.
util::IdGenerator ConnectionConfig::CONNECTION_ID_GENERATOR;
+
+ class ConnectionErrorRunnable : public Runnable {
+ private:
+
+ ActiveMQConnection* connection;
+ Pointer<ConnectionError> error;
+
+ public:
+
+ ConnectionErrorRunnable(ActiveMQConnection* connection, Pointer<ConnectionError> error) :
+ Runnable(), connection(connection), error(error) {}
+ virtual ~ConnectionErrorRunnable() {}
+
+ virtual void run() {
+ try {
+ if (error != NULL && error->getException() != NULL) {
+ this->connection->onAsyncException(error->getException()->createExceptionObject());
+ }
+ } catch(Exception& ex) {}
+ }
+ };
+
+ class OnAsyncExceptionRunnable : public Runnable {
+ private:
+
+ ActiveMQConnection* connection;
+ Exception ex;
+
+ public:
+
+ OnAsyncExceptionRunnable(ActiveMQConnection* connection, Exception ex) :
+ Runnable(), connection(connection), ex(ex) {}
+ virtual ~OnAsyncExceptionRunnable() {}
+
+ virtual void run() {
+ try {
+ cms::ExceptionListener* listener = this->connection->getExceptionListener();
+ if (listener != NULL) {
+ ActiveMQException amqEx(ex);
+ listener->onException(amqEx.convertToCMSException());
+ }
+ } catch(Exception& ex) {}
+ }
+ };
+
+ class OnExceptionRunnable : public Runnable {
+ private:
+
+ ActiveMQConnection* connection;
+ ConnectionConfig* config;
+ Exception* ex;
+
+ public:
+
+ OnExceptionRunnable(ActiveMQConnection* connection, ConnectionConfig* config, Exception* ex) :
+ Runnable(), connection(connection), config(config), ex(ex) {}
+ virtual ~OnExceptionRunnable() {}
+
+ virtual void run() {
+ try {
+ // Mark this Connection as having a Failed transport.
+ this->connection->setFirstFailureError(ex);
+
+ try{
+ this->config->transport->stop();
+ } catch(...) {
+ }
+
+ this->config->brokerInfoReceived->countDown();
+
+ // Clean up the Connection resources.
+ this->connection->cleanup();
+
+ Pointer< Iterator<TransportListener*> > iter( this->config->transportListeners.iterator() );
+
+ while( iter->hasNext() ) {
+ try{
+ iter->next()->onException(ex);
+ } catch(...) {}
+ }
+ } catch(Exception& ex) {}
+ }
+ };
+
}}
////////////////////////////////////////////////////////////////////////////////
@@ -253,7 +343,7 @@ ActiveMQConnection::~ActiveMQConnection() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::addDispatcher(
- const decaf::lang::Pointer<ConsumerId>& consumer, Dispatcher* dispatcher ) {
+ const decaf::lang::Pointer<ConsumerId>& consumer, Dispatcher* dispatcher) {
try{
synchronized(&this->config->dispatchers) {
@@ -266,7 +356,7 @@ void ActiveMQConnection::addDispatcher(
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::removeDispatcher(const decaf::lang::Pointer<ConsumerId>& consumer) {
- try{
+ try {
synchronized(&this->config->dispatchers) {
this->config->dispatchers.remove(consumer);
}
@@ -433,6 +523,13 @@ void ActiveMQConnection::close() {
}
}
+ try {
+ if (this->config->executor != NULL) {
+ this->config->executor->shutdown();
+ }
+ } catch(Exception& ex) {
+ }
+
// Now inform the Broker we are shutting down.
this->disconnect(lastDeliveredSequenceId);
@@ -447,7 +544,7 @@ void ActiveMQConnection::close() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::cleanup() {
- try{
+ try {
// Get the complete list of active sessions.
std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() );
@@ -483,7 +580,7 @@ void ActiveMQConnection::cleanup() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::start() {
- try{
+ try {
checkClosedOrFailed();
ensureConnectionInfoSent();
@@ -528,7 +625,7 @@ void ActiveMQConnection::stop() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::disconnect(long long lastDeliveredSequenceId) {
- try{
+ try {
// Clear the listener, we don't care about async errors at this point.
this->config->transport->setTransportListener(NULL);
@@ -600,7 +697,7 @@ void ActiveMQConnection::disconnect(long long lastDeliveredSequenceId) {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::sendPullRequest( const ConsumerInfo* consumer, long long timeout ) {
+void ActiveMQConnection::sendPullRequest(const ConsumerInfo* consumer, long long timeout) {
try {
@@ -622,7 +719,7 @@ void ActiveMQConnection::sendPullRequest( const ConsumerInfo* consumer, long lon
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::destroyDestination(const ActiveMQDestination* destination) {
- try{
+ try {
if (destination == NULL) {
throw NullPointerException(__FILE__, __LINE__, "Destination passed was NULL");
@@ -650,7 +747,7 @@ void ActiveMQConnection::destroyDestination(const ActiveMQDestination* destinati
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::destroyDestination(const cms::Destination* destination) {
- try{
+ try {
if (destination == NULL) {
throw NullPointerException(__FILE__, __LINE__, "Destination passed was NULL");
@@ -672,9 +769,9 @@ void ActiveMQConnection::destroyDestination(const cms::Destination* destination)
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::onCommand( const Pointer<Command>& command ) {
+void ActiveMQConnection::onCommand(const Pointer<Command>& command) {
- try{
+ try {
if (command->isMessageDispatch()) {
@@ -755,56 +852,24 @@ void ActiveMQConnection::onException( const decaf::lang::Exception& ex ) {
try {
- // We're disconnected - the asynchronous error is expected.
- if( this->isClosed() || this->closing.get() ) {
- return;
- }
-
onAsyncException(ex);
- // TODO This part should be done in a separate Thread.
-
- // Mark this Connection as having a Failed transport.
- this->transportFailed.set( true );
- if( this->config->firstFailureError == NULL ) {
- this->config->firstFailureError.reset( ex.clone() );
- }
-
- this->config->brokerInfoReceived->countDown();
-
- // TODO - Until this fires in another thread we can't dipose of
- // the transport here since it could result in this method
- // being called again recursively.
- try{
- // this->config->transport->stop();
- } catch(...) {
- }
-
- // Clean up the Connection resources.
- cleanup();
-
- Pointer< Iterator<TransportListener*> > iter( this->config->transportListeners.iterator() );
-
- while( iter->hasNext() ) {
- try{
- iter->next()->onException( ex );
- } catch(...) {}
+ // We're disconnected - the asynchronous error is expected.
+ if (!this->isClosed() || !this->closing.get()) {
+ this->config->executor->execute(new OnExceptionRunnable(this, config, ex.clone()));
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::onAsyncException( const decaf::lang::Exception& ex ) {
+void ActiveMQConnection::onAsyncException(const decaf::lang::Exception& ex) {
- if( !this->isClosed() || !this->closing.get() ) {
+ if (!this->isClosed() || !this->closing.get()) {
- if( this->config->exceptionListener != NULL ) {
-
- // Inform the user of the error.
- // TODO - This should be run by another Thread, i.e. Executor
- fire( exceptions::ActiveMQException( ex ) );
+ if (this->config->exceptionListener != NULL) {
+ this->config->executor->execute(new OnAsyncExceptionRunnable(this, ex));
}
}
}
@@ -843,7 +908,7 @@ void ActiveMQConnection::transportResumed() {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::oneway( Pointer<Command> command ) {
+void ActiveMQConnection::oneway(Pointer<Command> command) {
try {
checkClosedOrFailed();
@@ -856,7 +921,7 @@ void ActiveMQConnection::oneway( Pointer<Command> command ) {
}
////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> ActiveMQConnection::syncRequest( Pointer<Command> command, unsigned int timeout ) {
+Pointer<Response> ActiveMQConnection::syncRequest(Pointer<Command> command, unsigned int timeout) {
try {
@@ -912,7 +977,7 @@ void ActiveMQConnection::checkClosedOrFailed() const {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::ensureConnectionInfoSent() {
- try{
+ try {
// Can we skip sending the ConnectionInfo packet, cheap test
if (this->config->isConnectionInfoSentToBroker || closed.get()) {
@@ -1033,7 +1098,7 @@ void ActiveMQConnection::signalInterruptionProcessingComplete() {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setUsername( const std::string& username ) {
+void ActiveMQConnection::setUsername(const std::string& username) {
this->config->connectionInfo->setUserName( username );
}
@@ -1043,7 +1108,7 @@ const std::string& ActiveMQConnection::getUsername() const {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setPassword( const std::string& password ){
+void ActiveMQConnection::setPassword(const std::string& password) {
this->config->connectionInfo->setPassword( password );
}
@@ -1053,7 +1118,7 @@ const std::string& ActiveMQConnection::getPassword() const {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setBrokerURL( const std::string& brokerURL ){
+void ActiveMQConnection::setBrokerURL(const std::string& brokerURL) {
this->config->brokerURL = brokerURL;
}
@@ -1063,7 +1128,7 @@ const std::string& ActiveMQConnection::getBrokerURL() const {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setExceptionListener( cms::ExceptionListener* listener ) {
+void ActiveMQConnection::setExceptionListener(cms::ExceptionListener* listener) {
this->config->exceptionListener = listener;
}
@@ -1073,8 +1138,8 @@ cms::ExceptionListener* ActiveMQConnection::getExceptionListener() const {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setPrefetchPolicy( PrefetchPolicy* policy ) {
- this->config->defaultPrefetchPolicy.reset( policy );
+void ActiveMQConnection::setPrefetchPolicy(PrefetchPolicy* policy) {
+ this->config->defaultPrefetchPolicy.reset(policy);
}
////////////////////////////////////////////////////////////////////////////////
@@ -1083,8 +1148,8 @@ PrefetchPolicy* ActiveMQConnection::getPrefetchPolicy() const {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setRedeliveryPolicy( RedeliveryPolicy* policy ) {
- this->config->defaultRedeliveryPolicy.reset( policy );
+void ActiveMQConnection::setRedeliveryPolicy(RedeliveryPolicy* policy) {
+ this->config->defaultRedeliveryPolicy.reset(policy);
}
////////////////////////////////////////////////////////////////////////////////
@@ -1108,7 +1173,7 @@ bool ActiveMQConnection::isAlwaysSyncSend() const {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setAlwaysSyncSend( bool value ) {
+void ActiveMQConnection::setAlwaysSyncSend(bool value) {
this->config->alwaysSyncSend = value;
}
@@ -1118,7 +1183,7 @@ bool ActiveMQConnection::isUseAsyncSend() const {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setUseAsyncSend( bool value ) {
+void ActiveMQConnection::setUseAsyncSend(bool value) {
this->config->useAsyncSend = value;
}
@@ -1128,7 +1193,7 @@ bool ActiveMQConnection::isUseCompression() const {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setUseCompression( bool value ) {
+void ActiveMQConnection::setUseCompression(bool value) {
this->config->useCompression = value;
}
@@ -1138,13 +1203,13 @@ int ActiveMQConnection::getCompressionLevel() const {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setCompressionLevel( int value ) {
+void ActiveMQConnection::setCompressionLevel(int value) {
- if( value < 0 ) {
+ if (value < 0) {
this->config->compressionLevel = -1;
}
- this->config->compressionLevel = Math::min( value, 9 );
+ this->config->compressionLevel = Math::min(value, 9);
}
////////////////////////////////////////////////////////////////////////////////
@@ -1153,7 +1218,7 @@ unsigned int ActiveMQConnection::getSendTimeout() const {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setSendTimeout( unsigned int timeout ) {
+void ActiveMQConnection::setSendTimeout(unsigned int timeout) {
this->config->sendTimeout = timeout;
}
@@ -1163,7 +1228,7 @@ unsigned int ActiveMQConnection::getCloseTimeout() const {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setCloseTimeout( unsigned int timeout ) {
+void ActiveMQConnection::setCloseTimeout(unsigned int timeout) {
this->config->closeTimeout = timeout;
}
@@ -1173,7 +1238,7 @@ unsigned int ActiveMQConnection::getProducerWindowSize() const {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setProducerWindowSize( unsigned int windowSize ) {
+void ActiveMQConnection::setProducerWindowSize(unsigned int windowSize) {
this->config->producerWindowSize = windowSize;
}
@@ -1189,7 +1254,7 @@ long long ActiveMQConnection::getNextLocalTransactionId() {
////////////////////////////////////////////////////////////////////////////////
transport::Transport& ActiveMQConnection::getTransport() const {
- return *( this->config->transport );
+ return *(this->config->transport);
}
////////////////////////////////////////////////////////////////////////////////
@@ -1203,11 +1268,23 @@ bool ActiveMQConnection::isMessagePrioritySupported() const {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setMessagePrioritySupported( bool value ) {
+void ActiveMQConnection::setMessagePrioritySupported(bool value) {
this->config->messagePrioritySupported = value;
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setFirstFailureError(decaf::lang::Exception* error) {
+
+ this->transportFailed.set(true);
+
+ if (this->config->firstFailureError == NULL) {
+ this->config->firstFailureError.reset(error);
+ } else {
+ delete error;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
decaf::lang::Exception* ActiveMQConnection::getFirstFailureError() const {
return this->config->firstFailureError.get();
}
@@ -1217,7 +1294,7 @@ std::string ActiveMQConnection::getResourceManagerId() const {
try {
this->config->waitForBrokerInfo();
- if( this->config->brokerInfo == NULL ) {
+ if (this->config->brokerInfo == NULL) {
throw CMSException("Connection failed before Broker info was received.");
}
@@ -1228,5 +1305,10 @@ std::string ActiveMQConnection::getResourceManagerId() const {
////////////////////////////////////////////////////////////////////////////////
const decaf::util::Properties& ActiveMQConnection::getProperties() const {
- return *( this->config->properties );
+ return *(this->config->properties);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ExecutorService* ActiveMQConnection::getExecutor() const {
+ return this->config->executor.get();
}
View
16 activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
@@ -35,6 +35,7 @@
#include <decaf/util/Properties.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/util/concurrent/CopyOnWriteArrayList.h>
+#include <decaf/util/concurrent/ExecutorService.h>
#include <decaf/lang/exceptions/UnsupportedOperationException.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/exceptions/IllegalStateException.h>
@@ -653,9 +654,17 @@ namespace core{
void setTransportInterruptionProcessingComplete();
/**
+ * Sets the pointer to the first exception that caused the Connection to become failed.
+ *
+ * @param pointer to the exception instance that is to be the first failure error if the
+ * first error is already set this value is deleted.
+ */
+ void setFirstFailureError(decaf::lang::Exception* error);
+
+ /**
* Gets the pointer to the first exception that caused the Connection to become failed.
*
- * @returns pointer to and Exception instance or NULL if none is set.
+ * @returns pointer to an Exception instance or NULL if none is set.
*/
decaf::lang::Exception* getFirstFailureError() const;
@@ -686,6 +695,11 @@ namespace core{
*/
void ensureConnectionInfoSent();
+ /**
+ * @returns the ExecutorService used to run jobs for this Connection
+ */
+ decaf::util::concurrent::ExecutorService* getExecutor() const;
+
protected:
/**
View
1 activemq-cpp/src/main/decaf/util/concurrent/Executor.h
@@ -101,7 +101,6 @@ namespace concurrent {
*
* @throws NullPointerException if command is null
*/
-
virtual void execute(decaf::lang::Runnable* command) = 0;
/**

0 comments on commit ee0810f

Please sign in to comment.