diff --git a/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageHeaderGenerator.java b/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageHeaderGenerator.java index 1135da97a..21b55099e 100644 --- a/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageHeaderGenerator.java +++ b/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageHeaderGenerator.java @@ -72,6 +72,19 @@ protected void generateAdditonalMembers( PrintWriter out ) { super.generateAdditonalMembers(out); + + out.println(" /**"); + out.println(" * Create a Pointer based copy of this message. Useful for chaining a clone"); + out.println(" * operation with other operation such as casting to a cms Message type."); + out.println(" *"); + out.println(" * Pointer cmsMsg = message->copy().dynamic_cast();"); + out.println(" *"); + out.println(" * @returns a Pointer which is a duplicate of this object."); + out.println(" */"); + out.println(" Pointer copy() const {"); + out.println(" return Pointer(this->cloneDataStructure());"); + out.println(" }"); + out.println(""); out.println(" /**"); out.println(" * Handles the marshaling of the objects properties into the"); out.println(" * internal byte array before the object is marshaled to the"); diff --git a/activemq-cpp/configure.ac b/activemq-cpp/configure.ac index 67b4051de..48582e0af 100644 --- a/activemq-cpp/configure.ac +++ b/activemq-cpp/configure.ac @@ -38,8 +38,8 @@ ACTIVEMQ_API_VERSION=${ACTIVEMQ_VERSION} ## ------------------------------------------------------------ ## Define the Version variables for the CMS API Library ## ------------------------------------------------------------ -CMSAPI_MAJOR_VERSION=2 -CMSAPI_MINOR_VERSION=4 +CMSAPI_MAJOR_VERSION=3 +CMSAPI_MINOR_VERSION=0 CMSAPI_VERSION=${CMSAPI_MAJOR_VERSION}.${CMSAPI_MINOR_VERSION} AC_SUBST(CMSAPI_MAJOR_VERSION) diff --git a/activemq-cpp/src/main/activemq/cmsutil/CachedConsumer.h b/activemq-cpp/src/main/activemq/cmsutil/CachedConsumer.h index 0157d8b09..3317f7fd3 100644 --- a/activemq-cpp/src/main/activemq/cmsutil/CachedConsumer.h +++ b/activemq-cpp/src/main/activemq/cmsutil/CachedConsumer.h @@ -84,6 +84,14 @@ namespace cmsutil { return consumer->getMessageSelector(); } + virtual void setMessageTransformer(cms::MessageTransformer* transformer) { + consumer->setMessageTransformer(transformer); + } + + virtual cms::MessageTransformer* getMessageTransformer() const { + return consumer->getMessageTransformer(); + } + }; }} diff --git a/activemq-cpp/src/main/activemq/cmsutil/CachedProducer.h b/activemq-cpp/src/main/activemq/cmsutil/CachedProducer.h index 54e0d15fc..d601b98a1 100644 --- a/activemq-cpp/src/main/activemq/cmsutil/CachedProducer.h +++ b/activemq-cpp/src/main/activemq/cmsutil/CachedProducer.h @@ -116,6 +116,14 @@ namespace cmsutil { return producer->getTimeToLive(); } + virtual void setMessageTransformer(cms::MessageTransformer* transformer) { + producer->setMessageTransformer(transformer); + } + + virtual cms::MessageTransformer* getMessageTransformer() const { + return producer->getMessageTransformer(); + } + }; }} diff --git a/activemq-cpp/src/main/activemq/cmsutil/PooledSession.h b/activemq-cpp/src/main/activemq/cmsutil/PooledSession.h index 5532ee017..223b4cdc6 100644 --- a/activemq-cpp/src/main/activemq/cmsutil/PooledSession.h +++ b/activemq-cpp/src/main/activemq/cmsutil/PooledSession.h @@ -225,6 +225,14 @@ namespace cmsutil { session->unsubscribe( name ); } + virtual void setMessageTransformer(cms::MessageTransformer* transformer) { + session->setMessageTransformer(transformer); + } + + virtual cms::MessageTransformer* getMessageTransformer() const { + return session->getMessageTransformer(); + } + private: std::string getUniqueDestName( const cms::Destination* dest ); diff --git a/activemq-cpp/src/main/activemq/commands/Message.h b/activemq-cpp/src/main/activemq/commands/Message.h index 48729ab49..315d01f1f 100644 --- a/activemq-cpp/src/main/activemq/commands/Message.h +++ b/activemq-cpp/src/main/activemq/commands/Message.h @@ -136,6 +136,18 @@ namespace commands{ virtual bool equals( const DataStructure* value ) const; + /** + * Create a Pointer based copy of this message. Useful for chaining a clone + * operation with other operation such as casting to a cms Message type. + * + * Pointer cmsMsg = message->copy().dynamic_cast(); + * + * @returns a Pointer which is a duplicate of this object. + */ + Pointer copy() const { + return Pointer(this->cloneDataStructure()); + } + /** * Handles the marshaling of the objects properties into the * internal byte array before the object is marshaled to the diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp index e8a7f6ffb..94214562e 100644 --- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp +++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp @@ -145,6 +145,7 @@ namespace core{ std::auto_ptr defaultRedeliveryPolicy; cms::ExceptionListener* exceptionListener; + cms::MessageTransformer* transformer; Pointer connectionInfo; Pointer brokerInfo; @@ -190,6 +191,7 @@ namespace core{ defaultPrefetchPolicy(NULL), defaultRedeliveryPolicy(NULL), exceptionListener(NULL), + transformer(NULL), connectionInfo(), brokerInfo(), brokerWireFormatInfo(), @@ -400,6 +402,8 @@ cms::Session* ActiveMQConnection::createSession(cms::Session::AcknowledgeMode ac Pointer session(new ActiveMQSessionKernel( this, getNextSessionId(), ackMode, *this->config->properties)); + session->setMessageTransformer(this->config->transformer); + this->addSession(session); return new ActiveMQSession(session); @@ -1209,6 +1213,16 @@ cms::ExceptionListener* ActiveMQConnection::getExceptionListener() const { return this->config->exceptionListener; } +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQConnection::setMessageTransformer(cms::MessageTransformer* transformer) { + this->config->transformer = transformer; +} + +//////////////////////////////////////////////////////////////////////////////// +cms::MessageTransformer* ActiveMQConnection::getMessageTransformer() const { + return this->config->transformer; +} + //////////////////////////////////////////////////////////////////////////////// void ActiveMQConnection::setPrefetchPolicy(PrefetchPolicy* policy) { this->config->defaultPrefetchPolicy.reset(policy); diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h index 1f8917a84..d3546091d 100644 --- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h +++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h @@ -295,6 +295,16 @@ namespace core{ */ virtual void setExceptionListener(cms::ExceptionListener* listener); + /** + * {@inheritDoc} + */ + virtual void setMessageTransformer(cms::MessageTransformer* transformer); + + /** + * {@inheritDoc} + */ + virtual cms::MessageTransformer* getMessageTransformer() const; + public: // Configuration Options /** diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp index dec38c920..692447d86 100644 --- a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp +++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp @@ -16,6 +16,7 @@ */ #include "ActiveMQConnectionFactory.h" +#include #include #include #include @@ -81,6 +82,7 @@ namespace core{ unsigned int producerWindowSize; cms::ExceptionListener* defaultListener; + cms::MessageTransformer* defaultTransformer; std::auto_ptr defaultPrefetchPolicy; std::auto_ptr defaultRedeliveryPolicy; @@ -100,6 +102,7 @@ namespace core{ closeTimeout(15000), producerWindowSize(0), defaultListener(NULL), + defaultTransformer(NULL), defaultPrefetchPolicy(new DefaultPrefetchPolicy()), defaultRedeliveryPolicy(new DefaultRedeliveryPolicy()) { } @@ -370,6 +373,10 @@ void ActiveMQConnectionFactory::configureConnection(ActiveMQConnection* connecti if (this->settings->defaultListener) { connection->setExceptionListener(this->settings->defaultListener); } + + if (this->settings->defaultTransformer) { + connection->setMessageTransformer(this->settings->defaultTransformer); + } } //////////////////////////////////////////////////////////////////////////////// @@ -427,6 +434,16 @@ cms::ExceptionListener* ActiveMQConnectionFactory::getExceptionListener() const return this->settings->defaultListener; } +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQConnectionFactory::setMessageTransformer(cms::MessageTransformer* transformer) { + this->settings->defaultTransformer = transformer; +} + +//////////////////////////////////////////////////////////////////////////////// +cms::MessageTransformer* ActiveMQConnectionFactory::getMessageTransformer() const { + return this->settings->defaultTransformer; +} + //////////////////////////////////////////////////////////////////////////////// void ActiveMQConnectionFactory::setPrefetchPolicy(PrefetchPolicy* policy) { this->settings->defaultPrefetchPolicy.reset(policy); diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h index ad8947c82..86a98b6f1 100644 --- a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h +++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h @@ -188,7 +188,7 @@ namespace core{ * @param listener * The listener to set on the connection or NULL for no listener. */ - void setExceptionListener(cms::ExceptionListener* listener); + virtual void setExceptionListener(cms::ExceptionListener* listener); /** * Returns the currently set ExceptionListener that will be set on any new Connection @@ -196,7 +196,23 @@ namespace core{ * * @return a pointer to a CMS ExceptionListener instance or NULL if not set. */ - cms::ExceptionListener* getExceptionListener() const; + virtual cms::ExceptionListener* getExceptionListener() const; + + /** + * Set an MessageTransformer instance that is passed on to all Connection objects created from + * this ConnectionFactory + * + * @param transformer + * Pointer to the cms::MessageTransformer to set on all newly created Connection objects. + */ + virtual void setMessageTransformer(cms::MessageTransformer* transformer); + + /** + * Gets the currently configured MessageTransformer for this ConnectionFactory. + * + * @returns the pointer to the currently set cms::MessageTransformer. + */ + virtual cms::MessageTransformer* getMessageTransformer() const; /** * Sets the PrefetchPolicy instance that this factory should use when it creates diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp index 98d14bcd5..627e7fec4 100644 --- a/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp +++ b/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp @@ -189,3 +189,13 @@ const Pointer& ActiveMQConsumer::getConsumerId() const { decaf::lang::Exception* ActiveMQConsumer::getFailureError() const { return this->config->kernel->getFailureError(); } + +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQConsumer::setMessageTransformer(cms::MessageTransformer* transformer) { + this->config->kernel->setMessageTransformer(transformer); +} + +//////////////////////////////////////////////////////////////////////////////// +cms::MessageTransformer* ActiveMQConsumer::getMessageTransformer() const { + return this->config->kernel->getMessageTransformer(); +} diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h b/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h index 12964b857..d12bd9784 100644 --- a/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h +++ b/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h @@ -81,6 +81,10 @@ namespace core{ virtual std::string getMessageSelector() const; + virtual void setMessageTransformer(cms::MessageTransformer* transformer); + + virtual cms::MessageTransformer* getMessageTransformer() const; + public: /** diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h b/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h index 8e87cc923..59487ffdf 100644 --- a/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h +++ b/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h @@ -165,6 +165,14 @@ namespace core{ return this->kernel->getSendTimeout(); } + virtual void setMessageTransformer(cms::MessageTransformer* transformer) { + this->kernel->setMessageTransformer(transformer); + } + + virtual cms::MessageTransformer* getMessageTransformer() const { + return this->kernel->getMessageTransformer(); + } + public: /** diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQSession.h b/activemq-cpp/src/main/activemq/core/ActiveMQSession.h index 8d07e9a47..680de6427 100644 --- a/activemq-cpp/src/main/activemq/core/ActiveMQSession.h +++ b/activemq-cpp/src/main/activemq/core/ActiveMQSession.h @@ -149,6 +149,26 @@ namespace core{ return this->kernel->getExceptionListener(); } + /** + * Set an MessageTransformer instance that is passed on to all MessageProducer and MessageConsumer + * objects created from this Session. + * + * @param transformer + * Pointer to the cms::MessageTransformer to set on all MessageConsumers and MessageProducers. + */ + virtual void setMessageTransformer(cms::MessageTransformer* transformer) { + this->kernel->setMessageTransformer(transformer); + } + + /** + * Gets the currently configured MessageTransformer for this Session. + * + * @returns the pointer to the currently set cms::MessageTransformer. + */ + virtual cms::MessageTransformer* getMessageTransformer() const { + return this->kernel->getMessageTransformer(); + } + /** * Gets the Session Information object for this session, if the * session is closed than this method throws an exception. diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp index 95d050fff..50ffaa9cd 100644 --- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp +++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -46,6 +47,7 @@ #include #include #include +#include #include using namespace std; @@ -75,6 +77,7 @@ namespace kernels { public: cms::MessageListener* listener; + cms::MessageTransformer* transformer; decaf::util::concurrent::Mutex listenerMutex; AtomicBoolean deliveringAcks; AtomicBoolean started; @@ -93,6 +96,7 @@ namespace kernels { Pointer scheduler; ActiveMQConsumerKernelConfig() : listener(NULL), + transformer(NULL), listenerMutex(), deliveringAcks(), started(), @@ -311,7 +315,9 @@ ActiveMQConsumerKernel::ActiveMQConsumerKernel(ActiveMQSessionKernel* session, bool noLocal, bool browser, bool dispatchAsync, - cms::MessageListener* listener) : internal(NULL), session(NULL), consumerInfo() { + cms::MessageListener* listener) : internal(NULL), + session(NULL), + consumerInfo() { if (session == NULL) { throw IllegalArgumentException(__FILE__, __LINE__, "Consumer created with NULL Session"); @@ -598,19 +604,12 @@ cms::Message* ActiveMQConsumerKernel::receive() { return NULL; } - // Message pre-processing beforeMessageIsConsumed(message); - - // Need to clone the message because the user is responsible for freeing - // its copy of the message. - cms::Message* clonedMessage = - dynamic_cast(message->getMessage()->cloneDataStructure()); - - // Post processing (may result in the message being deleted) afterMessageIsConsumed(message, false); - // Return the cloned message. - return clonedMessage; + // Need to clone the message because the user is responsible for freeing + // its copy of the message, createCMSMessage will do this for us. + return createCMSMessage(message).release(); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } @@ -631,19 +630,12 @@ cms::Message* ActiveMQConsumerKernel::receive( int millisecs ) { return NULL; } - // Message preprocessing beforeMessageIsConsumed(message); - - // Need to clone the message because the user is responsible for freeing - // its copy of the message. - cms::Message* clonedMessage = - dynamic_cast(message->getMessage()->cloneDataStructure()); - - // Post processing (may result in the message being deleted) afterMessageIsConsumed(message, false); - // Return the cloned message. - return clonedMessage; + // Need to clone the message because the user is responsible for freeing + // its copy of the message, createCMSMessage will do this for us. + return createCMSMessage(message).release(); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } @@ -664,25 +656,18 @@ cms::Message* ActiveMQConsumerKernel::receiveNoWait() { return NULL; } - // Message preprocessing beforeMessageIsConsumed(message); - - // Need to clone the message because the user is responsible for freeing - // its copy of the message. - cms::Message* clonedMessage = - dynamic_cast(message->getMessage()->cloneDataStructure()); - - // Post processing (may result in the message being deleted) afterMessageIsConsumed(message, false); - // Return the cloned message. - return clonedMessage; + // Need to clone the message because the user is responsible for freeing + // its copy of the message, createCMSMessage will do this for us. + return createCMSMessage(message).release(); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumerKernel::setMessageListener( cms::MessageListener* listener ) { +void ActiveMQConsumerKernel::setMessageListener(cms::MessageListener* listener) { try { @@ -722,16 +707,6 @@ void ActiveMQConsumerKernel::setMessageListener( cms::MessageListener* listener //////////////////////////////////////////////////////////////////////////////// void ActiveMQConsumerKernel::beforeMessageIsConsumed(const Pointer& dispatch) { - // If the Session is in ClientAcknowledge or IndividualAcknowledge mode, then - // we set the handler in the message to this object and send it out. - if (session->isClientAcknowledge()) { - Pointer ackHandler(new ClientAckHandler(this->session)); - dispatch->getMessage()->setAckHandler(ackHandler); - } else if (session->isIndividualAcknowledge()) { - Pointer ackHandler(new IndividualAckHandler(this, dispatch)); - dispatch->getMessage()->setAckHandler(ackHandler); - } - this->internal->lastDeliveredSequenceId = dispatch->getMessage()->getMessageId()->getBrokerSequenceId(); if (!isAutoAcknowledgeBatch()) { @@ -1102,8 +1077,7 @@ void ActiveMQConsumerKernel::dispatch(const Pointer& dispatch) clearMessagesInProgress(); if (this->internal->clearDispatchList) { - // we are reconnecting so lets flush the in progress - // messages + // we are reconnecting so lets flush the in progress messages this->internal->clearDispatchList = false; this->internal->unconsumedMessages->clear(); } @@ -1113,8 +1087,6 @@ void ActiveMQConsumerKernel::dispatch(const Pointer& dispatch) // Don't dispatch expired messages, ack it and then destroy it if (dispatch->getMessage() != NULL && dispatch->getMessage()->isExpired()) { this->ackLater(dispatch, ActiveMQConstants::ACK_TYPE_CONSUMED); - - // stop now, don't queue return; } @@ -1123,9 +1095,9 @@ void ActiveMQConsumerKernel::dispatch(const Pointer& dispatch) // If we have a listener, send the message. if (this->internal->listener != NULL && internal->unconsumedMessages->isRunning()) { + Pointer message = createCMSMessage(dispatch); beforeMessageIsConsumed(dispatch); - this->internal->listener->onMessage( - dynamic_cast (dispatch->getMessage().get())); + this->internal->listener->onMessage(message.get()); afterMessageIsConsumed(dispatch, false); } else { @@ -1143,6 +1115,55 @@ void ActiveMQConsumerKernel::dispatch(const Pointer& dispatch) AMQ_CATCHALL_THROW( ActiveMQException ) } +//////////////////////////////////////////////////////////////////////////////// +Pointer ActiveMQConsumerKernel::createCMSMessage(Pointer dispatch) { + + try { + + Pointer message = dispatch->getMessage()->copy(); + if (this->internal->transformer != NULL) { + cms::Message* source = dynamic_cast(message.get()); + cms::Message* transformed = NULL; + + if (this->internal->transformer->consumerTransform( + (cms::Session*)this->session, (cms::MessageConsumer*)this, source, &transformed)) { + + if (transformed == NULL) { + throw NullPointerException(__FILE__, __LINE__, "Client MessageTransformer returned a NULL message"); + } + + Message* amqMessage = NULL; + + // If the transform create a new ActiveMQ Message command then we can discard the transformed + // cms::Message here, otherwise the transformed message was already an ActiveMQ Message + // command of some sort so we just place casted amqMessage in our Pointer and let it get + // cleaned up after its been dispatched. + if (ActiveMQMessageTransformation::transformMessage(transformed, this->session->getConnection(), &amqMessage)){ + delete transformed; + } + + message.reset(amqMessage); + } + } + + // If the Session is in ClientAcknowledge or IndividualAcknowledge mode, then + // we set the handler in the message to this object and send it out. + if (session->isClientAcknowledge()) { + Pointer ackHandler(new ClientAckHandler(this->session)); + message->setAckHandler(ackHandler); + } else if (session->isIndividualAcknowledge()) { + Pointer ackHandler(new IndividualAckHandler(this, dispatch)); + message->setAckHandler(ackHandler); + } + + return message.dynamicCast(); + } + AMQ_CATCH_RETHROW( cms::CMSException ) + AMQ_CATCH_RETHROW( ActiveMQException ) + AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) + AMQ_CATCHALL_THROW( ActiveMQException ) +} + //////////////////////////////////////////////////////////////////////////////// void ActiveMQConsumerKernel::sendPullRequest(long long timeout) { @@ -1316,6 +1337,16 @@ cms::MessageListener* ActiveMQConsumerKernel::getMessageListener() const { return this->internal->listener; } +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQConsumerKernel::setMessageTransformer(cms::MessageTransformer* transformer) { + this->internal->transformer = transformer; +} + +//////////////////////////////////////////////////////////////////////////////// +cms::MessageTransformer* ActiveMQConsumerKernel::getMessageTransformer() const { + return this->internal->transformer; +} + //////////////////////////////////////////////////////////////////////////////// const Pointer& ActiveMQConsumerKernel::getConsumerInfo() const { this->checkClosed(); diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h index eba38d41f..aa31e454c 100644 --- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h +++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h @@ -106,6 +106,10 @@ namespace kernels { virtual void acknowledge(const Pointer& dispatch); + virtual void setMessageTransformer(cms::MessageTransformer* transformer); + + virtual cms::MessageTransformer* getMessageTransformer() const; + public: // Dispatcher Methods virtual void dispatch( const Pointer& message ); @@ -303,6 +307,10 @@ namespace kernels { private: + // Creates a deliverable cms::Message from a received MessageDispatch, transforming if needed + // and configuring appropriate ack handlers. + Pointer createCMSMessage(Pointer dispatch); + // Using options from the Destination URI override any settings that are // defined for this consumer. void applyDestinationOptions(const Pointer& info); diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp index 1114e3f07..47e8450f9 100644 --- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp +++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp @@ -202,6 +202,20 @@ void ActiveMQProducerKernel::send(const cms::Destination* destination, cms::Mess throw cms::CMSException("No destination specified", NULL); } + cms::Message* outbound = message; + Pointer scopedMessage; + if (this->transformer != NULL) { + if (this->transformer->producerTransform(this->session, this, message, &outbound)) { + // scopedMessage ensures that when we are responsible for the lifetime of the + // transformed message, the message remains valid until the send operation either + // succeeds or throws an exception. + scopedMessage.reset(outbound); + } + if (outbound == NULL) { + throw NullPointerException(__FILE__, __LINE__, "MessageTransformer set transformed message to NULL"); + } + } + if (this->memoryUsage.get() != NULL) { try { this->memoryUsage->waitForSpace(); @@ -210,7 +224,7 @@ void ActiveMQProducerKernel::send(const cms::Destination* destination, cms::Mess } } - this->session->send(this, dest, message, deliveryMode, priority, timeToLive, + this->session->send(this, dest, outbound, deliveryMode, priority, timeToLive, this->memoryUsage.get(), this->sendTimeout); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h index 0f0671c90..fa3761f83 100644 --- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h +++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -79,6 +80,9 @@ namespace kernels { // Generator of Message Sequence Id numbers for this producer. util::LongSequenceGenerator messageSequence; + // Used to tranform Message before sending them to the CMS bus. + cms::MessageTransformer* transformer; + private: ActiveMQProducerKernel(const ActiveMQProducerKernel&); @@ -91,6 +95,8 @@ namespace kernels { * * @param session * The Session which is the parent of this Producer. + * @param parent + * Pointer to the cms::MessageProducer that will wrap this kernel object. * @param producerId * Pointer to a ProducerId object which identifies this producer. * @param destination @@ -119,6 +125,26 @@ namespace kernels { virtual void send(const cms::Destination* destination, cms::Message* message, int deliveryMode, int priority, long long timeToLive); + /** + * Set an MessageTransformer instance that is applied to all cms::Message objects before they + * are sent on to the CMS bus. + * + * @param transformer + * Pointer to the cms::MessageTransformer to apply on each cms:;MessageSend. + */ + virtual void setMessageTransformer(cms::MessageTransformer* transformer) { + this->transformer = transformer; + } + + /** + * Gets the currently configured MessageTransformer for this MessageProducer. + * + * @returns the pointer to the currently set cms::MessageTransformer. + */ + virtual cms::MessageTransformer* getMessageTransformer() const { + return this->transformer; + } + /** * Sets the delivery mode for this Producer * @param mode - The DeliveryMode to use for Message sends. diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp index 86209ede6..7bcdec22a 100644 --- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp +++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp @@ -102,10 +102,13 @@ namespace kernels{ Pointer closeSync; ConsumersMap consumers; Mutex sendMutex; + cms::MessageTransformer* transformer; public: - SessionConfig() : synchronizationRegistered(false), producers(), scheduler(), closeSync(), consumers(), sendMutex() {} + SessionConfig() : synchronizationRegistered(false), + producers(), scheduler(), closeSync(), + consumers(), sendMutex(), transformer(NULL) {} ~SessionConfig() {} }; @@ -537,6 +540,8 @@ cms::MessageConsumer* ActiveMQSessionKernel::createConsumer(const cms::Destinati throw ex; } + consumer->setMessageTransformer(this->config->transformer); + if (this->connection->isStarted()) { consumer->start(); } @@ -579,6 +584,8 @@ cms::MessageConsumer* ActiveMQSessionKernel::createDurableConsumer(const cms::To throw ex; } + consumer->setMessageTransformer(this->config->transformer); + if (this->connection->isStarted()) { consumer->start(); } @@ -627,6 +634,8 @@ cms::MessageProducer* ActiveMQSessionKernel::createProducer( const cms::Destinat throw ex; } + producer->setMessageTransformer(this->config->transformer); + return new ActiveMQProducer(producer); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() @@ -941,6 +950,16 @@ cms::ExceptionListener* ActiveMQSessionKernel::getExceptionListener() { return NULL; } +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQSessionKernel::setMessageTransformer(cms::MessageTransformer* transformer) { + this->config->transformer = transformer; +} + +//////////////////////////////////////////////////////////////////////////////// +cms::MessageTransformer* ActiveMQSessionKernel::getMessageTransformer() const { + return this->config->transformer; +} + //////////////////////////////////////////////////////////////////////////////// Pointer ActiveMQSessionKernel::getScheduler() const { return this->config->scheduler; diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h index 6992f0693..e0fdf094a 100644 --- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h +++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h @@ -298,6 +298,22 @@ namespace kernels { */ cms::ExceptionListener* getExceptionListener(); + /** + * Set an MessageTransformer instance that is passed on to all MessageProducer and MessageConsumer + * objects created from this Session. + * + * @param transformer + * Pointer to the cms::MessageTransformer to set on all MessageConsumers and MessageProducers. + */ + virtual void setMessageTransformer(cms::MessageTransformer* transformer); + + /** + * Gets the currently configured MessageTransformer for this Session. + * + * @returns the pointer to the currently set cms::MessageTransformer. + */ + virtual cms::MessageTransformer* getMessageTransformer() const; + /** * Gets the Session Information object for this session, if the * session is closed than this method throws an exception. diff --git a/activemq-cpp/src/main/cms/Connection.h b/activemq-cpp/src/main/cms/Connection.h index d39089159..5b459de75 100644 --- a/activemq-cpp/src/main/cms/Connection.h +++ b/activemq-cpp/src/main/cms/Connection.h @@ -28,6 +28,7 @@ namespace cms{ class ExceptionListener; + class MessageTransformer; /** * The client's connection to its provider. @@ -158,6 +159,26 @@ namespace cms{ */ virtual void setExceptionListener(ExceptionListener* listener) = 0; + /** + * Set an MessageTransformer instance that is passed on to all Session objects created from + * this Connection. + * + * The CMS code never takes ownership of the MessageTransformer pointer which implies that + * the client code must ensure that the object remains valid for the lifetime of the CMS + * object to which the MessageTransformer has been assigned. + * + * @param transformer + * Pointer to the cms::MessageTransformer to set on all newly created Session objects. + */ + virtual void setMessageTransformer(cms::MessageTransformer* transformer) = 0; + + /** + * Gets the currently configured MessageTransformer for this Connection. + * + * @returns the pointer to the currently set cms::MessageTransformer. + */ + virtual cms::MessageTransformer* getMessageTransformer() const = 0; + }; } diff --git a/activemq-cpp/src/main/cms/ConnectionFactory.h b/activemq-cpp/src/main/cms/ConnectionFactory.h index 1100ac038..0c674f304 100644 --- a/activemq-cpp/src/main/cms/ConnectionFactory.h +++ b/activemq-cpp/src/main/cms/ConnectionFactory.h @@ -18,13 +18,16 @@ #define _CMS_CONNECTIONFACTORY_H_ #include -#include #include #include namespace cms{ + class Connection; + class ExceptionListener; + class MessageTransformer; + /** * Defines the interface for a factory that creates connection objects, the Connection * objects returned implement the CMS Connection interface and hide the CMS Provider @@ -51,7 +54,7 @@ namespace cms{ * * @throws CMSException if an internal error occurs while creating the Connection. */ - virtual Connection* createConnection() = 0; + virtual cms::Connection* createConnection() = 0; /** * Creates a connection with the default specified identity. The @@ -96,6 +99,46 @@ namespace cms{ */ virtual cms::Connection* createConnection(const std::string& username, const std::string& password, const std::string& clientId) = 0; + /** + * Set an ExceptionListener instance that is passed on to all Connection objects created from + * this ConnectionFactory + * + * @param transformer + * Pointer to the cms::ExceptionListener to set on all newly created Connection objects/ + */ + virtual void setExceptionListener(cms::ExceptionListener* listener) = 0; + + /** + * Gets the currently configured ExceptionListener for this ConnectionFactory. + * + * The CMS code never takes ownership of the ExceptionListener pointer which implies that + * the client code must ensure that the object remains valid for the lifetime of the CMS + * object to which the ExceptionListener has been assigned. + * + * @returns the pointer to the currently set cms::ExceptionListener. + */ + virtual cms::ExceptionListener* getExceptionListener() const = 0; + + /** + * Set an MessageTransformer instance that is passed on to all Connection objects created from + * this ConnectionFactory + * + * The CMS code never takes ownership of the MessageTransformer pointer which implies that + * the client code must ensure that the object remains valid for the lifetime of the CMS + * object to which the MessageTransformer has been assigned. + * + * @param transformer + * Pointer to the cms::MessageTransformer to set on all newly created Connection objects. + */ + virtual void setMessageTransformer(cms::MessageTransformer* transformer) = 0; + + /** + * Gets the currently configured MessageTransformer for this ConnectionFactory. + * + * @returns the pointer to the currently set cms::MessageTransformer. + */ + virtual cms::MessageTransformer* getMessageTransformer() const = 0; + public: /** @@ -113,7 +156,7 @@ namespace cms{ * * @throws CMSException if an internal error occurs while creating the ConnectionFactory. */ - static ConnectionFactory* createCMSConnectionFactory(const std::string& brokerURI); + static cms::ConnectionFactory* createCMSConnectionFactory(const std::string& brokerURI); }; diff --git a/activemq-cpp/src/main/cms/MessageConsumer.h b/activemq-cpp/src/main/cms/MessageConsumer.h index 7c7b60954..fe8fce7a4 100644 --- a/activemq-cpp/src/main/cms/MessageConsumer.h +++ b/activemq-cpp/src/main/cms/MessageConsumer.h @@ -27,6 +27,8 @@ namespace cms{ + class MessageTransformer; + /** * A client uses a MessageConsumer to received messages * from a destination. @@ -120,6 +122,26 @@ namespace cms{ */ virtual std::string getMessageSelector() const = 0; + /** + * Set an MessageTransformer instance that is applied to all cms::Message objects before they + * are dispatched to client code. + * + * The CMS code never takes ownership of the MessageTransformer pointer which implies that + * the client code must ensure that the object remains valid for the lifetime of the CMS + * object to which the MessageTransformer has been assigned. + * + * @param transformer + * Pointer to the cms::MessageTransformer to apply on each cms:;Message dispatch. + */ + virtual void setMessageTransformer(cms::MessageTransformer* transformer) = 0; + + /** + * Gets the currently configured MessageTransformer for this MessageConsumer. + * + * @returns the pointer to the currently set cms::MessageTransformer. + */ + virtual cms::MessageTransformer* getMessageTransformer() const = 0; + }; } diff --git a/activemq-cpp/src/main/cms/MessageProducer.h b/activemq-cpp/src/main/cms/MessageProducer.h index 342c3f524..6d3f76e19 100644 --- a/activemq-cpp/src/main/cms/MessageProducer.h +++ b/activemq-cpp/src/main/cms/MessageProducer.h @@ -30,6 +30,8 @@ namespace cms{ + class MessageTransformer; + /** * A client uses a MessageProducer object to send messages to * a Destination. A MessageProducer object is created by @@ -237,6 +239,26 @@ namespace cms{ */ virtual long long getTimeToLive() const = 0; + /** + * Set an MessageTransformer instance that is applied to all cms::Message objects before they + * are sent on to the CMS bus. + * + * The CMS code never takes ownership of the MessageTransformer pointer which implies that + * the client code must ensure that the object remains valid for the lifetime of the CMS + * object to which the MessageTransformer has been assigned. + * + * @param transformer + * Pointer to the cms::MessageTransformer to apply on each cms:;MessageSend. + */ + virtual void setMessageTransformer(cms::MessageTransformer* transformer) = 0; + + /** + * Gets the currently configured MessageTransformer for this MessageProducer. + * + * @returns the pointer to the currently set cms::MessageTransformer. + */ + virtual cms::MessageTransformer* getMessageTransformer() const = 0; + }; } diff --git a/activemq-cpp/src/main/cms/MessageTransformer.cpp b/activemq-cpp/src/main/cms/MessageTransformer.cpp new file mode 100644 index 000000000..2ae672ce9 --- /dev/null +++ b/activemq-cpp/src/main/cms/MessageTransformer.cpp @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "MessageTransformer.h" + +using namespace cms; + +//////////////////////////////////////////////////////////////////////////////// +MessageTransformer::~MessageTransformer() { +} + diff --git a/activemq-cpp/src/main/cms/MessageTransformer.h b/activemq-cpp/src/main/cms/MessageTransformer.h new file mode 100644 index 000000000..cf3ec8c89 --- /dev/null +++ b/activemq-cpp/src/main/cms/MessageTransformer.h @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _CMS_MESSAGETRANSFORMER_H_ +#define _CMS_MESSAGETRANSFORMER_H_ + +#include + +namespace cms { + + class MessageProducer; + class MessageConsumer; + class Message; + class Session; + + /** + * Provides an interface for clients to transform cms::Message objects inside the + * CMS MessageProducer and MessageConsumer objects before the message's are sent or + * received. + * + * @since 3.0 + */ + class MessageTransformer { + public: + + virtual ~MessageTransformer(); + + /** + * Transforms the given message inside the producer before it is sent to the CMS bus. + * + * The contract of this method is that the resulting transformed message pointer is set if and + * only if the method actually creates a new cms::Message object, otherwise it must always be + * set to NULL. The return value indicates whether a transformation took place and indicates + * that the resulting transformed cms::Message pointer will need to be deleted once the + * producer has sent the cms::Message on to the CMS bus. + * + * @param session + * The Session used to create the target MessageProducer for this transformation. + * @param producer + * The MessageProducer instance that is going to handle sending the transformed Message. + * @param message + * The CMS Message object that is to be transformed by this method. + * @param transformed + * A pointer to the location in memory where the newly transformed Message has been allocated. + * + * @returns true if the MessageProducer should handle deleting the transformed Message once sent. + * + * @throws cms::CMSException if an error occurs during the transform operation. + */ + virtual bool producerTransform(cms::Session* session, cms::MessageProducer* producer, cms::Message* message, cms::Message** transformed) = 0; + + /** + * Transforms the given message inside the consumer before it is dispatched to the client code. + * + * The contract of this method is that the resulting transformed message pointer is set if and + * only if the method actually creates a new cms::Message object, otherwise it must always be + * set to NULL. The return value indicates whether a transformation took place and indicates + * that the resulting transformed cms::Message pointer will need to be deleted once the + * consumer completed message dispatch. + * + * @param session + * The Session used to create the target MessageConsumer for this transformation. + * @param consumer + * The MessageConsumer instance that is going to handle dispatching the transformed Message. + * @param message + * The CMS Message object that is to be transformed by this method. + * @param transformed + * A pointer to the location in memory where the newly transformed Message has been allocated. + * + * @returns true if the MessageConsumer should handle deleting the transformed Message once sent. + * + * @throws cms::CMSException if an error occurs during the transform operation. + */ + virtual bool consumerTransform(cms::Session* session, cms::MessageConsumer* consumer, cms::Message* message, cms::Message** transformed) = 0; + + }; + +} + +#endif /* _CMS_MESSAGETRANSFORMER_H_ */ diff --git a/activemq-cpp/src/main/cms/Session.h b/activemq-cpp/src/main/cms/Session.h index 460eeea12..ae3f8a278 100644 --- a/activemq-cpp/src/main/cms/Session.h +++ b/activemq-cpp/src/main/cms/Session.h @@ -37,6 +37,8 @@ namespace cms{ + class MessageTransformer; + /** * A Session object is a single-threaded context for producing and consuming * messages. @@ -443,6 +445,26 @@ namespace cms{ */ virtual void unsubscribe(const std::string& name) = 0; + /** + * Set an MessageTransformer instance that is passed on to all MessageProducer and MessageConsumer + * objects created from this Session. + * + * The CMS code never takes ownership of the MessageTransformer pointer which implies that + * the client code must ensure that the object remains valid for the lifetime of the CMS + * object to which the MessageTransformer has been assigned. + * + * @param transformer + * Pointer to the cms::MessageTransformer to set on all MessageConsumers and MessageProducers. + */ + virtual void setMessageTransformer(cms::MessageTransformer* transformer) = 0; + + /** + * Gets the currently configured MessageTransformer for this Session. + * + * @returns the pointer to the currently set cms::MessageTransformer. + */ + virtual cms::MessageTransformer* getMessageTransformer() const = 0; + }; } diff --git a/activemq-cpp/src/test/activemq/cmsutil/DummyConnection.h b/activemq-cpp/src/test/activemq/cmsutil/DummyConnection.h index 23505ebd1..2cc16c409 100644 --- a/activemq-cpp/src/test/activemq/cmsutil/DummyConnection.h +++ b/activemq-cpp/src/test/activemq/cmsutil/DummyConnection.h @@ -26,19 +26,21 @@ namespace cmsutil { class MessageContext; - class DummyConnection : public cms::Connection { + class DummyConnection: public cms::Connection { private: cms::ExceptionListener* listener; + cms::MessageTransformer* transformer; std::string clientId; MessageContext* messageContext; public: - DummyConnection(MessageContext* messageContext ) { + DummyConnection(MessageContext* messageContext) { this->messageContext = messageContext; } - virtual ~DummyConnection() {} + virtual ~DummyConnection() { + } virtual const cms::ConnectionMetaData* getMetaData() const { return NULL; @@ -53,12 +55,11 @@ namespace cmsutil { virtual void stop() { } - virtual cms::Session* createSession() throw ( cms::CMSException ) { + virtual cms::Session* createSession() throw (cms::CMSException) { return new DummySession(messageContext); } - virtual cms::Session* createSession( cms::Session::AcknowledgeMode ackMode ) - throw ( cms::CMSException ) { + virtual cms::Session* createSession(cms::Session::AcknowledgeMode ackMode) throw (cms::CMSException) { DummySession* s = new DummySession(messageContext); s->setAcknowledgeMode(ackMode); @@ -69,7 +70,7 @@ namespace cmsutil { return clientId; } - virtual void setClientID( const std::string& id ) { + virtual void setClientID(const std::string& id) { this->clientId = id; } @@ -77,9 +78,17 @@ namespace cmsutil { return listener; } - virtual void setExceptionListener( cms::ExceptionListener* listener ) { + virtual void setExceptionListener(cms::ExceptionListener* listener) { this->listener = listener; } + + virtual cms::MessageTransformer* getMessageTransformer() const { + return transformer; + } + + virtual void setMessageTransformer(cms::MessageTransformer* transformer) { + this->transformer = transformer; + } }; }} diff --git a/activemq-cpp/src/test/activemq/cmsutil/DummyConnectionFactory.h b/activemq-cpp/src/test/activemq/cmsutil/DummyConnectionFactory.h index c3f09d618..e46a4de4e 100644 --- a/activemq-cpp/src/test/activemq/cmsutil/DummyConnectionFactory.h +++ b/activemq-cpp/src/test/activemq/cmsutil/DummyConnectionFactory.h @@ -29,31 +29,24 @@ namespace cmsutil { private: MessageContext messageContext; + cms::ExceptionListener* listener; + cms::MessageTransformer* transformer; public: virtual ~DummyConnectionFactory() {} - virtual cms::Connection* createConnection() throw ( cms::CMSException ) { - + virtual cms::Connection* createConnection() { return new DummyConnection(&messageContext); } - virtual cms::Connection* createConnection( const std::string& username, - const std::string& password ) - throw ( cms::CMSException ) { - + virtual cms::Connection* createConnection(const std::string& username, const std::string& password) { return new DummyConnection(&messageContext); } - virtual cms::Connection* createConnection( const std::string& username, - const std::string& password, - const std::string& clientId ) - throw ( cms::CMSException ) { - + virtual cms::Connection* createConnection(const std::string& username, const std::string& password, const std::string& clientId) { DummyConnection* c = new DummyConnection(&messageContext); c->setClientID(clientId); - return c; } @@ -61,6 +54,21 @@ namespace cmsutil { return &messageContext; } + virtual cms::ExceptionListener* getExceptionListener() const { + return listener; + } + + virtual void setExceptionListener(cms::ExceptionListener* listener) { + this->listener = listener; + } + + virtual cms::MessageTransformer* getMessageTransformer() const { + return transformer; + } + + virtual void setMessageTransformer(cms::MessageTransformer* transformer) { + this->transformer = transformer; + } }; }} diff --git a/activemq-cpp/src/test/activemq/cmsutil/DummyConsumer.h b/activemq-cpp/src/test/activemq/cmsutil/DummyConsumer.h index bcd447cd4..376b1d1e4 100644 --- a/activemq-cpp/src/test/activemq/cmsutil/DummyConsumer.h +++ b/activemq-cpp/src/test/activemq/cmsutil/DummyConsumer.h @@ -26,8 +26,10 @@ namespace cmsutil { class DummyConsumer : public cms::MessageConsumer { private: + std::string selector; cms::MessageListener* listener; + cms::MessageTransformer* transformer; MessageContext* messageContext; const cms::Destination* dest; bool noLocal; @@ -40,6 +42,7 @@ namespace cmsutil { this->noLocal = noLocal; this->dest = dest; this->listener = NULL; + this->transformer = NULL; } virtual ~DummyConsumer() {} @@ -74,6 +77,14 @@ namespace cmsutil { return selector; } + virtual cms::MessageTransformer* getMessageTransformer() const { + return transformer; + } + + virtual void setMessageTransformer(cms::MessageTransformer* transformer) { + this->transformer = transformer; + } + }; }} diff --git a/activemq-cpp/src/test/activemq/cmsutil/DummyProducer.h b/activemq-cpp/src/test/activemq/cmsutil/DummyProducer.h index 722462ce2..dcb1a8fe0 100644 --- a/activemq-cpp/src/test/activemq/cmsutil/DummyProducer.h +++ b/activemq-cpp/src/test/activemq/cmsutil/DummyProducer.h @@ -26,6 +26,7 @@ namespace cmsutil { class DummyProducer : public cms::MessageProducer { private: + const cms::Destination* dest; int deliveryMode; bool disableMessageId; @@ -33,21 +34,25 @@ namespace cmsutil { int priority; long long ttl; MessageContext* messageContext; + cms::MessageTransformer* transformer; public: DummyProducer(MessageContext* messageContext, const cms::Destination* dest) { - deliveryMode = 1; - disableMessageId = false; - disableMessageTimestamp = false; - priority = 4; - ttl = 0L; + this->deliveryMode = 1; + this->disableMessageId = false; + this->disableMessageTimestamp = false; + this->priority = 4; + this->ttl = 0L; this->dest = dest; this->messageContext = messageContext; + this->transformer = NULL; + } + virtual ~DummyProducer() { } - virtual ~DummyProducer() {} - virtual void close() {} + virtual void close() { + } /** * Sends the message to the default producer destination, but does @@ -58,7 +63,7 @@ namespace cmsutil { * The message to be sent. * @throws cms::CMSException */ - virtual void send( cms::Message* message ) throw ( cms::CMSException ){ + virtual void send(cms::Message* message) throw (cms::CMSException) { send(message, deliveryMode, priority, ttl); } @@ -76,8 +81,7 @@ namespace cmsutil { * The time to live value for this message in milliseconds. * @throws cms::CMSException */ - virtual void send( cms::Message* message, int deliveryMode, int priority, - long long timeToLive) throw ( cms::CMSException ){ + virtual void send(cms::Message* message, int deliveryMode, int priority, long long timeToLive) throw (cms::CMSException) { send(dest, message, deliveryMode, priority, timeToLive); } @@ -93,8 +97,7 @@ namespace cmsutil { * the message to be sent. * @throws cms::CMSException */ - virtual void send( const cms::Destination* destination, - cms::Message* message ) throw ( cms::CMSException ){ + virtual void send(const cms::Destination* destination, cms::Message* message) throw (cms::CMSException) { send(dest, message, deliveryMode, priority, ttl); } @@ -114,9 +117,8 @@ namespace cmsutil { * The time to live value for this message in milliseconds. * @throws cms::CMSException */ - virtual void send( const cms::Destination* destination, - cms::Message* message, int deliveryMode, int priority, - long long timeToLive) throw ( cms::CMSException ){ + virtual void send(const cms::Destination* destination, cms::Message* message, int deliveryMode, int priority, long long timeToLive) + throw (cms::CMSException) { messageContext->send(destination, message, deliveryMode, priority, timeToLive); } @@ -127,7 +129,7 @@ namespace cmsutil { * @param mode * The DeliveryMode */ - virtual void setDeliveryMode( int mode ) throw ( cms::CMSException ) { + virtual void setDeliveryMode(int mode) throw (cms::CMSException) { this->deliveryMode = mode; } @@ -136,7 +138,7 @@ namespace cmsutil { * * @return The DeliveryMode */ - virtual int getDeliveryMode() const throw ( cms::CMSException ) { + virtual int getDeliveryMode() const throw (cms::CMSException) { return deliveryMode; } @@ -146,7 +148,7 @@ namespace cmsutil { * @param value * boolean indicating enable / disable (true / false) */ - virtual void setDisableMessageID( bool value ) throw ( cms::CMSException ) { + virtual void setDisableMessageID(bool value) throw (cms::CMSException) { disableMessageId = value; } @@ -155,7 +157,7 @@ namespace cmsutil { * * @return boolean indicating enable / disable (true / false) */ - virtual bool getDisableMessageID() const throw ( cms::CMSException ) { + virtual bool getDisableMessageID() const throw (cms::CMSException) { return disableMessageId; } @@ -163,7 +165,7 @@ namespace cmsutil { * Sets if Message Time Stamps are disbled for this Producer * @param value - boolean indicating enable / disable (true / false) */ - virtual void setDisableMessageTimeStamp( bool value ) throw ( cms::CMSException ) { + virtual void setDisableMessageTimeStamp(bool value) throw (cms::CMSException) { disableMessageTimestamp = value; } @@ -172,7 +174,7 @@ namespace cmsutil { * * @return boolean indicating enable / disable (true / false) */ - virtual bool getDisableMessageTimeStamp() const throw ( cms::CMSException ) { + virtual bool getDisableMessageTimeStamp() const throw (cms::CMSException) { return disableMessageTimestamp; } @@ -182,7 +184,7 @@ namespace cmsutil { * @param priority * int value for Priority level */ - virtual void setPriority( int priority ) throw ( cms::CMSException ) { + virtual void setPriority(int priority) throw (cms::CMSException) { this->priority = priority; } @@ -191,7 +193,7 @@ namespace cmsutil { * * @return int based priority level */ - virtual int getPriority() const throw ( cms::CMSException ) { + virtual int getPriority() const throw (cms::CMSException) { return priority; } @@ -203,7 +205,7 @@ namespace cmsutil { * @param time * default time to live value in milliseconds */ - virtual void setTimeToLive( long long time ) throw ( cms::CMSException ) { + virtual void setTimeToLive(long long time) throw (cms::CMSException) { ttl = time; } @@ -212,10 +214,17 @@ namespace cmsutil { * * @return Time to live value in milliseconds */ - virtual long long getTimeToLive() const throw ( cms::CMSException ) { + virtual long long getTimeToLive() const throw (cms::CMSException) { return ttl; } + virtual cms::MessageTransformer* getMessageTransformer() const { + return transformer; + } + + virtual void setMessageTransformer(cms::MessageTransformer* transformer) { + this->transformer = transformer; + } }; }} diff --git a/activemq-cpp/src/test/activemq/cmsutil/DummySession.h b/activemq-cpp/src/test/activemq/cmsutil/DummySession.h index 689908e09..926999e96 100644 --- a/activemq-cpp/src/test/activemq/cmsutil/DummySession.h +++ b/activemq-cpp/src/test/activemq/cmsutil/DummySession.h @@ -35,101 +35,110 @@ namespace cmsutil { cms::Session::AcknowledgeMode mode; MessageContext* messageContext; + cms::MessageTransformer* transformer; public: DummySession(MessageContext* messageContext) { this->mode = cms::Session::AUTO_ACKNOWLEDGE; this->messageContext = messageContext; + this->transformer = NULL; } - virtual ~DummySession() {} + virtual ~DummySession() { + } - virtual void close() {} + virtual void close() { + } - virtual void start() {} + virtual void start() { + } - virtual void stop() {} + virtual void stop() { + } - virtual void commit() {} + virtual void commit() { + } - virtual void rollback() {} + virtual void rollback() { + } - virtual void recover() {} + virtual void recover() { + } - virtual cms::MessageConsumer* createConsumer( - const cms::Destination* destination ) { + virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination) { return new DummyConsumer(messageContext, destination, "", false); } - virtual cms::MessageConsumer* createConsumer( - const cms::Destination* destination, - const std::string& selector ) - throw ( cms::CMSException ) { + virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination, const std::string& selector) throw (cms::CMSException) { return new DummyConsumer(messageContext, destination, selector, false); } - virtual cms::MessageConsumer* createConsumer( - const cms::Destination* destination, - const std::string& selector, - bool noLocal ) - throw ( cms::CMSException ) { + virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination, const std::string& selector, bool noLocal) throw (cms::CMSException) { return new DummyConsumer(messageContext, destination, selector, noLocal); } - virtual cms::MessageConsumer* createDurableConsumer( - const cms::Topic* destination, - const std::string& name, - const std::string& selector, - bool noLocal = false ) { return NULL; } + virtual cms::MessageConsumer* createDurableConsumer(const cms::Topic* destination, const std::string& name, const std::string& selector, + bool noLocal = false) { + return NULL; + } - virtual cms::MessageProducer* createProducer( const cms::Destination* destination ) - { return new DummyProducer(messageContext, destination); } + virtual cms::MessageProducer* createProducer(const cms::Destination* destination) { + return new DummyProducer(messageContext, destination); + } - virtual cms::QueueBrowser* createBrowser( const cms::Queue* queue ) - { return NULL; } + virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue) { + return NULL; + } - virtual cms::QueueBrowser* createBrowser( const cms::Queue* queue, const std::string& selector ) - { return NULL; } + virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue, const std::string& selector) { + return NULL; + } - virtual cms::Queue* createQueue( const std::string& queueName ) - { - return new activemq::commands::ActiveMQQueue( queueName ); + virtual cms::Queue* createQueue(const std::string& queueName) { + return new activemq::commands::ActiveMQQueue(queueName); } - virtual cms::Topic* createTopic( const std::string& topicName ) { - return new activemq::commands::ActiveMQTopic( topicName ); + virtual cms::Topic* createTopic(const std::string& topicName) { + return new activemq::commands::ActiveMQTopic(topicName); } - virtual cms::TemporaryQueue* createTemporaryQueue() - { return NULL; } + virtual cms::TemporaryQueue* createTemporaryQueue() { + return NULL; + } - virtual cms::TemporaryTopic* createTemporaryTopic() - { return NULL; } + virtual cms::TemporaryTopic* createTemporaryTopic() { + return NULL; + } - virtual cms::Message* createMessage() - { return NULL; } + virtual cms::Message* createMessage() { + return NULL; + } - virtual cms::BytesMessage* createBytesMessage() - { return NULL; } + virtual cms::BytesMessage* createBytesMessage() { + return NULL; + } - virtual cms::BytesMessage* createBytesMessage( - const unsigned char* bytes, int bytesSize ) { + virtual cms::BytesMessage* createBytesMessage(const unsigned char* bytes, int bytesSize) { return NULL; } - virtual cms::StreamMessage* createStreamMessage() - throw ( cms::CMSException ){ return NULL; } + virtual cms::StreamMessage* createStreamMessage() throw (cms::CMSException) { + return NULL; + } - virtual cms::TextMessage* createTextMessage() - throw ( cms::CMSException ){ return NULL; } + virtual cms::TextMessage* createTextMessage() throw (cms::CMSException) { + return NULL; + } - virtual cms::TextMessage* createTextMessage( const std::string& text ) - throw ( cms::CMSException ){ return NULL; } + virtual cms::TextMessage* createTextMessage(const std::string& text) throw (cms::CMSException) { + return NULL; + } - virtual cms::MapMessage* createMapMessage() - throw ( cms::CMSException ){ return NULL; } + virtual cms::MapMessage* createMapMessage() throw (cms::CMSException) { + return NULL; + } virtual cms::Session::AcknowledgeMode getAcknowledgeMode() const { return mode; @@ -140,11 +149,19 @@ namespace cmsutil { } virtual bool isTransacted() const { - return mode==cms::Session::SESSION_TRANSACTED; + return mode == cms::Session::SESSION_TRANSACTED; } - virtual void unsubscribe( const std::string& name ) {} + virtual void unsubscribe(const std::string& name) { + } + + virtual cms::MessageTransformer* getMessageTransformer() const { + return transformer; + } + virtual void setMessageTransformer(cms::MessageTransformer* transformer) { + this->transformer = transformer; + } }; }}