Permalink
Browse files

fix for: https://issues.apache.org/jira/browse/AMQCPP-160

git-svn-id: https://svn.apache.org/repos/asf/activemq/activemq-cpp/trunk@1335280 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent a523328 commit b528e8a1ebf28d99ea31541e8614d61a2b4b4211 Timothy A. Bish committed May 7, 2012
Showing with 721 additions and 156 deletions.
  1. +13 −0 ...re-generator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageHeaderGenerator.java
  2. +2 −2 activemq-cpp/configure.ac
  3. +8 −0 activemq-cpp/src/main/activemq/cmsutil/CachedConsumer.h
  4. +8 −0 activemq-cpp/src/main/activemq/cmsutil/CachedProducer.h
  5. +8 −0 activemq-cpp/src/main/activemq/cmsutil/PooledSession.h
  6. +12 −0 activemq-cpp/src/main/activemq/commands/Message.h
  7. +14 −0 activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
  8. +10 −0 activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
  9. +17 −0 activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
  10. +18 −2 activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
  11. +10 −0 activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
  12. +4 −0 activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
  13. +8 −0 activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
  14. +20 −0 activemq-cpp/src/main/activemq/core/ActiveMQSession.h
  15. +79 −48 activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
  16. +8 −0 activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
  17. +15 −1 activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
  18. +26 −0 activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h
  19. +20 −1 activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
  20. +16 −0 activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
  21. +21 −0 activemq-cpp/src/main/cms/Connection.h
  22. +46 −3 activemq-cpp/src/main/cms/ConnectionFactory.h
  23. +22 −0 activemq-cpp/src/main/cms/MessageConsumer.h
  24. +22 −0 activemq-cpp/src/main/cms/MessageProducer.h
  25. +25 −0 activemq-cpp/src/main/cms/MessageTransformer.cpp
  26. +94 −0 activemq-cpp/src/main/cms/MessageTransformer.h
  27. +22 −0 activemq-cpp/src/main/cms/Session.h
  28. +17 −8 activemq-cpp/src/test/activemq/cmsutil/DummyConnection.h
  29. +20 −12 activemq-cpp/src/test/activemq/cmsutil/DummyConnectionFactory.h
  30. +11 −0 activemq-cpp/src/test/activemq/cmsutil/DummyConsumer.h
  31. +34 −25 activemq-cpp/src/test/activemq/cmsutil/DummyProducer.h
  32. +71 −54 activemq-cpp/src/test/activemq/cmsutil/DummySession.h
View
13 ...ator/src/main/java/org/apache/activemq/openwire/tool/commands/MessageHeaderGenerator.java
@@ -72,6 +72,19 @@ protected void generateAdditonalMembers( PrintWriter out ) {
super.generateAdditonalMembers(out);
+
+ out.println(" /**");
+ out.println(" * Create a Pointer based copy of this message. Useful for chaining a clone");
+ out.println(" * operation with other operation such as casting to a cms Message type.");
+ out.println(" *");
+ out.println(" * Pointer<cms::Message> cmsMsg = message->copy().dynamic_cast<cms::Message>();");
+ out.println(" *");
+ out.println(" * @returns a Pointer<Message> which is a duplicate of this object.");
+ out.println(" */");
+ out.println(" Pointer<Message> copy() const {");
+ out.println(" return Pointer<Message>(this->cloneDataStructure());");
+ out.println(" }");
+ out.println("");
out.println(" /**");
out.println(" * Handles the marshaling of the objects properties into the");
out.println(" * internal byte array before the object is marshaled to the");
View
4 activemq-cpp/configure.ac
@@ -38,8 +38,8 @@ ACTIVEMQ_API_VERSION=${ACTIVEMQ_VERSION}
## ------------------------------------------------------------
## Define the Version variables for the CMS API Library
## ------------------------------------------------------------
-CMSAPI_MAJOR_VERSION=2
-CMSAPI_MINOR_VERSION=4
+CMSAPI_MAJOR_VERSION=3
+CMSAPI_MINOR_VERSION=0
CMSAPI_VERSION=${CMSAPI_MAJOR_VERSION}.${CMSAPI_MINOR_VERSION}
AC_SUBST(CMSAPI_MAJOR_VERSION)
View
8 activemq-cpp/src/main/activemq/cmsutil/CachedConsumer.h
@@ -84,6 +84,14 @@ namespace cmsutil {
return consumer->getMessageSelector();
}
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer) {
+ consumer->setMessageTransformer(transformer);
+ }
+
+ virtual cms::MessageTransformer* getMessageTransformer() const {
+ return consumer->getMessageTransformer();
+ }
+
};
}}
View
8 activemq-cpp/src/main/activemq/cmsutil/CachedProducer.h
@@ -116,6 +116,14 @@ namespace cmsutil {
return producer->getTimeToLive();
}
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer) {
+ producer->setMessageTransformer(transformer);
+ }
+
+ virtual cms::MessageTransformer* getMessageTransformer() const {
+ return producer->getMessageTransformer();
+ }
+
};
}}
View
8 activemq-cpp/src/main/activemq/cmsutil/PooledSession.h
@@ -225,6 +225,14 @@ namespace cmsutil {
session->unsubscribe( name );
}
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer) {
+ session->setMessageTransformer(transformer);
+ }
+
+ virtual cms::MessageTransformer* getMessageTransformer() const {
+ return session->getMessageTransformer();
+ }
+
private:
std::string getUniqueDestName( const cms::Destination* dest );
View
12 activemq-cpp/src/main/activemq/commands/Message.h
@@ -137,6 +137,18 @@ namespace commands{
virtual bool equals( const DataStructure* value ) const;
/**
+ * Create a Pointer based copy of this message. Useful for chaining a clone
+ * operation with other operation such as casting to a cms Message type.
+ *
+ * Pointer<cms::Message> cmsMsg = message->copy().dynamic_cast<cms::Message>();
+ *
+ * @returns a Pointer<Message> which is a duplicate of this object.
+ */
+ Pointer<Message> copy() const {
+ return Pointer<Message>(this->cloneDataStructure());
+ }
+
+ /**
* Handles the marshaling of the objects properties into the
* internal byte array before the object is marshaled to the
* wire
View
14 activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
@@ -145,6 +145,7 @@ namespace core{
std::auto_ptr<RedeliveryPolicy> defaultRedeliveryPolicy;
cms::ExceptionListener* exceptionListener;
+ cms::MessageTransformer* transformer;
Pointer<commands::ConnectionInfo> connectionInfo;
Pointer<commands::BrokerInfo> brokerInfo;
@@ -190,6 +191,7 @@ namespace core{
defaultPrefetchPolicy(NULL),
defaultRedeliveryPolicy(NULL),
exceptionListener(NULL),
+ transformer(NULL),
connectionInfo(),
brokerInfo(),
brokerWireFormatInfo(),
@@ -400,6 +402,8 @@ cms::Session* ActiveMQConnection::createSession(cms::Session::AcknowledgeMode ac
Pointer<ActiveMQSessionKernel> session(new ActiveMQSessionKernel(
this, getNextSessionId(), ackMode, *this->config->properties));
+ session->setMessageTransformer(this->config->transformer);
+
this->addSession(session);
return new ActiveMQSession(session);
@@ -1210,6 +1214,16 @@ cms::ExceptionListener* ActiveMQConnection::getExceptionListener() const {
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setMessageTransformer(cms::MessageTransformer* transformer) {
+ this->config->transformer = transformer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageTransformer* ActiveMQConnection::getMessageTransformer() const {
+ return this->config->transformer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setPrefetchPolicy(PrefetchPolicy* policy) {
this->config->defaultPrefetchPolicy.reset(policy);
}
View
10 activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
@@ -295,6 +295,16 @@ namespace core{
*/
virtual void setExceptionListener(cms::ExceptionListener* listener);
+ /**
+ * {@inheritDoc}
+ */
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer);
+
+ /**
+ * {@inheritDoc}
+ */
+ virtual cms::MessageTransformer* getMessageTransformer() const;
+
public: // Configuration Options
/**
View
17 activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
@@ -16,6 +16,7 @@
*/
#include "ActiveMQConnectionFactory.h"
+#include <cms/MessageTransformer.h>
#include <decaf/net/URI.h>
#include <decaf/util/Properties.h>
#include <decaf/lang/Boolean.h>
@@ -81,6 +82,7 @@ namespace core{
unsigned int producerWindowSize;
cms::ExceptionListener* defaultListener;
+ cms::MessageTransformer* defaultTransformer;
std::auto_ptr<PrefetchPolicy> defaultPrefetchPolicy;
std::auto_ptr<RedeliveryPolicy> defaultRedeliveryPolicy;
@@ -100,6 +102,7 @@ namespace core{
closeTimeout(15000),
producerWindowSize(0),
defaultListener(NULL),
+ defaultTransformer(NULL),
defaultPrefetchPolicy(new DefaultPrefetchPolicy()),
defaultRedeliveryPolicy(new DefaultRedeliveryPolicy()) {
}
@@ -370,6 +373,10 @@ void ActiveMQConnectionFactory::configureConnection(ActiveMQConnection* connecti
if (this->settings->defaultListener) {
connection->setExceptionListener(this->settings->defaultListener);
}
+
+ if (this->settings->defaultTransformer) {
+ connection->setMessageTransformer(this->settings->defaultTransformer);
+ }
}
////////////////////////////////////////////////////////////////////////////////
@@ -428,6 +435,16 @@ cms::ExceptionListener* ActiveMQConnectionFactory::getExceptionListener() const
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setMessageTransformer(cms::MessageTransformer* transformer) {
+ this->settings->defaultTransformer = transformer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageTransformer* ActiveMQConnectionFactory::getMessageTransformer() const {
+ return this->settings->defaultTransformer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setPrefetchPolicy(PrefetchPolicy* policy) {
this->settings->defaultPrefetchPolicy.reset(policy);
}
View
20 activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
@@ -188,15 +188,31 @@ namespace core{
* @param listener
* The listener to set on the connection or NULL for no listener.
*/
- void setExceptionListener(cms::ExceptionListener* listener);
+ virtual void setExceptionListener(cms::ExceptionListener* listener);
/**
* Returns the currently set ExceptionListener that will be set on any new Connection
* instance that is created by this factory.
*
* @return a pointer to a CMS ExceptionListener instance or NULL if not set.
*/
- cms::ExceptionListener* getExceptionListener() const;
+ virtual cms::ExceptionListener* getExceptionListener() const;
+
+ /**
+ * Set an MessageTransformer instance that is passed on to all Connection objects created from
+ * this ConnectionFactory
+ *
+ * @param transformer
+ * Pointer to the cms::MessageTransformer to set on all newly created Connection objects.
+ */
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer);
+
+ /**
+ * Gets the currently configured MessageTransformer for this ConnectionFactory.
+ *
+ * @returns the pointer to the currently set cms::MessageTransformer.
+ */
+ virtual cms::MessageTransformer* getMessageTransformer() const;
/**
* Sets the PrefetchPolicy instance that this factory should use when it creates
View
10 activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
@@ -189,3 +189,13 @@ const Pointer<commands::ConsumerId>& ActiveMQConsumer::getConsumerId() const {
decaf::lang::Exception* ActiveMQConsumer::getFailureError() const {
return this->config->kernel->getFailureError();
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::setMessageTransformer(cms::MessageTransformer* transformer) {
+ this->config->kernel->setMessageTransformer(transformer);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageTransformer* ActiveMQConsumer::getMessageTransformer() const {
+ return this->config->kernel->getMessageTransformer();
+}
View
4 activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
@@ -81,6 +81,10 @@ namespace core{
virtual std::string getMessageSelector() const;
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer);
+
+ virtual cms::MessageTransformer* getMessageTransformer() const;
+
public:
/**
View
8 activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
@@ -165,6 +165,14 @@ namespace core{
return this->kernel->getSendTimeout();
}
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer) {
+ this->kernel->setMessageTransformer(transformer);
+ }
+
+ virtual cms::MessageTransformer* getMessageTransformer() const {
+ return this->kernel->getMessageTransformer();
+ }
+
public:
/**
View
20 activemq-cpp/src/main/activemq/core/ActiveMQSession.h
@@ -150,6 +150,26 @@ namespace core{
}
/**
+ * Set an MessageTransformer instance that is passed on to all MessageProducer and MessageConsumer
+ * objects created from this Session.
+ *
+ * @param transformer
+ * Pointer to the cms::MessageTransformer to set on all MessageConsumers and MessageProducers.
+ */
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer) {
+ this->kernel->setMessageTransformer(transformer);
+ }
+
+ /**
+ * Gets the currently configured MessageTransformer for this Session.
+ *
+ * @returns the pointer to the currently set cms::MessageTransformer.
+ */
+ virtual cms::MessageTransformer* getMessageTransformer() const {
+ return this->kernel->getMessageTransformer();
+ }
+
+ /**
* Gets the Session Information object for this session, if the
* session is closed than this method throws an exception.
* @return SessionInfo Reference
View
127 activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
@@ -28,6 +28,7 @@
#include <activemq/util/Config.h>
#include <activemq/util/CMSExceptionSupport.h>
#include <activemq/util/ActiveMQProperties.h>
+#include <activemq/util/ActiveMQMessageTransformation.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/commands/Message.h>
#include <activemq/commands/MessageAck.h>
@@ -46,6 +47,7 @@
#include <activemq/core/kernels/ActiveMQSessionKernel.h>
#include <activemq/threads/Scheduler.h>
#include <cms/ExceptionListener.h>
+#include <cms/MessageTransformer.h>
#include <memory>
using namespace std;
@@ -75,6 +77,7 @@ namespace kernels {
public:
cms::MessageListener* listener;
+ cms::MessageTransformer* transformer;
decaf::util::concurrent::Mutex listenerMutex;
AtomicBoolean deliveringAcks;
AtomicBoolean started;
@@ -93,6 +96,7 @@ namespace kernels {
Pointer<Scheduler> scheduler;
ActiveMQConsumerKernelConfig() : listener(NULL),
+ transformer(NULL),
listenerMutex(),
deliveringAcks(),
started(),
@@ -311,7 +315,9 @@ ActiveMQConsumerKernel::ActiveMQConsumerKernel(ActiveMQSessionKernel* session,
bool noLocal,
bool browser,
bool dispatchAsync,
- cms::MessageListener* listener) : internal(NULL), session(NULL), consumerInfo() {
+ cms::MessageListener* listener) : internal(NULL),
+ session(NULL),
+ consumerInfo() {
if (session == NULL) {
throw IllegalArgumentException(__FILE__, __LINE__, "Consumer created with NULL Session");
@@ -598,19 +604,12 @@ cms::Message* ActiveMQConsumerKernel::receive() {
return NULL;
}
- // Message pre-processing
beforeMessageIsConsumed(message);
-
- // Need to clone the message because the user is responsible for freeing
- // its copy of the message.
- cms::Message* clonedMessage =
- dynamic_cast<cms::Message*>(message->getMessage()->cloneDataStructure());
-
- // Post processing (may result in the message being deleted)
afterMessageIsConsumed(message, false);
- // Return the cloned message.
- return clonedMessage;
+ // Need to clone the message because the user is responsible for freeing
+ // its copy of the message, createCMSMessage will do this for us.
+ return createCMSMessage(message).release();
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -631,19 +630,12 @@ cms::Message* ActiveMQConsumerKernel::receive( int millisecs ) {
return NULL;
}
- // Message preprocessing
beforeMessageIsConsumed(message);
-
- // Need to clone the message because the user is responsible for freeing
- // its copy of the message.
- cms::Message* clonedMessage =
- dynamic_cast<cms::Message*>(message->getMessage()->cloneDataStructure());
-
- // Post processing (may result in the message being deleted)
afterMessageIsConsumed(message, false);
- // Return the cloned message.
- return clonedMessage;
+ // Need to clone the message because the user is responsible for freeing
+ // its copy of the message, createCMSMessage will do this for us.
+ return createCMSMessage(message).release();
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -664,25 +656,18 @@ cms::Message* ActiveMQConsumerKernel::receiveNoWait() {
return NULL;
}
- // Message preprocessing
beforeMessageIsConsumed(message);
-
- // Need to clone the message because the user is responsible for freeing
- // its copy of the message.
- cms::Message* clonedMessage =
- dynamic_cast<cms::Message*>(message->getMessage()->cloneDataStructure());
-
- // Post processing (may result in the message being deleted)
afterMessageIsConsumed(message, false);
- // Return the cloned message.
- return clonedMessage;
+ // Need to clone the message because the user is responsible for freeing
+ // its copy of the message, createCMSMessage will do this for us.
+ return createCMSMessage(message).release();
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::setMessageListener( cms::MessageListener* listener ) {
+void ActiveMQConsumerKernel::setMessageListener(cms::MessageListener* listener) {
try {
@@ -722,16 +707,6 @@ void ActiveMQConsumerKernel::setMessageListener( cms::MessageListener* listener
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::beforeMessageIsConsumed(const Pointer<MessageDispatch>& dispatch) {
- // If the Session is in ClientAcknowledge or IndividualAcknowledge mode, then
- // we set the handler in the message to this object and send it out.
- if (session->isClientAcknowledge()) {
- Pointer<ActiveMQAckHandler> ackHandler(new ClientAckHandler(this->session));
- dispatch->getMessage()->setAckHandler(ackHandler);
- } else if (session->isIndividualAcknowledge()) {
- Pointer<ActiveMQAckHandler> ackHandler(new IndividualAckHandler(this, dispatch));
- dispatch->getMessage()->setAckHandler(ackHandler);
- }
-
this->internal->lastDeliveredSequenceId = dispatch->getMessage()->getMessageId()->getBrokerSequenceId();
if (!isAutoAcknowledgeBatch()) {
@@ -1102,8 +1077,7 @@ void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch)
clearMessagesInProgress();
if (this->internal->clearDispatchList) {
- // we are reconnecting so lets flush the in progress
- // messages
+ // we are reconnecting so lets flush the in progress messages
this->internal->clearDispatchList = false;
this->internal->unconsumedMessages->clear();
}
@@ -1113,8 +1087,6 @@ void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch)
// Don't dispatch expired messages, ack it and then destroy it
if (dispatch->getMessage() != NULL && dispatch->getMessage()->isExpired()) {
this->ackLater(dispatch, ActiveMQConstants::ACK_TYPE_CONSUMED);
-
- // stop now, don't queue
return;
}
@@ -1123,9 +1095,9 @@ void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch)
// If we have a listener, send the message.
if (this->internal->listener != NULL && internal->unconsumedMessages->isRunning()) {
+ Pointer<cms::Message> message = createCMSMessage(dispatch);
beforeMessageIsConsumed(dispatch);
- this->internal->listener->onMessage(
- dynamic_cast<cms::Message*> (dispatch->getMessage().get()));
+ this->internal->listener->onMessage(message.get());
afterMessageIsConsumed(dispatch, false);
} else {
@@ -1144,6 +1116,55 @@ void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch)
}
////////////////////////////////////////////////////////////////////////////////
+Pointer<cms::Message> ActiveMQConsumerKernel::createCMSMessage(Pointer<MessageDispatch> dispatch) {
+
+ try {
+
+ Pointer<Message> message = dispatch->getMessage()->copy();
+ if (this->internal->transformer != NULL) {
+ cms::Message* source = dynamic_cast<cms::Message*>(message.get());
+ cms::Message* transformed = NULL;
+
+ if (this->internal->transformer->consumerTransform(
+ (cms::Session*)this->session, (cms::MessageConsumer*)this, source, &transformed)) {
+
+ if (transformed == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "Client MessageTransformer returned a NULL message");
+ }
+
+ Message* amqMessage = NULL;
+
+ // If the transform create a new ActiveMQ Message command then we can discard the transformed
+ // cms::Message here, otherwise the transformed message was already an ActiveMQ Message
+ // command of some sort so we just place casted amqMessage in our Pointer and let it get
+ // cleaned up after its been dispatched.
+ if (ActiveMQMessageTransformation::transformMessage(transformed, this->session->getConnection(), &amqMessage)){
+ delete transformed;
+ }
+
+ message.reset(amqMessage);
+ }
+ }
+
+ // If the Session is in ClientAcknowledge or IndividualAcknowledge mode, then
+ // we set the handler in the message to this object and send it out.
+ if (session->isClientAcknowledge()) {
+ Pointer<ActiveMQAckHandler> ackHandler(new ClientAckHandler(this->session));
+ message->setAckHandler(ackHandler);
+ } else if (session->isIndividualAcknowledge()) {
+ Pointer<ActiveMQAckHandler> ackHandler(new IndividualAckHandler(this, dispatch));
+ message->setAckHandler(ackHandler);
+ }
+
+ return message.dynamicCast<cms::Message>();
+ }
+ AMQ_CATCH_RETHROW( cms::CMSException )
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::sendPullRequest(long long timeout) {
try {
@@ -1317,6 +1338,16 @@ cms::MessageListener* ActiveMQConsumerKernel::getMessageListener() const {
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setMessageTransformer(cms::MessageTransformer* transformer) {
+ this->internal->transformer = transformer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageTransformer* ActiveMQConsumerKernel::getMessageTransformer() const {
+ return this->internal->transformer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
const Pointer<commands::ConsumerInfo>& ActiveMQConsumerKernel::getConsumerInfo() const {
this->checkClosed();
return this->consumerInfo;
View
8 activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
@@ -106,6 +106,10 @@ namespace kernels {
virtual void acknowledge(const Pointer<commands::MessageDispatch>& dispatch);
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer);
+
+ virtual cms::MessageTransformer* getMessageTransformer() const;
+
public: // Dispatcher Methods
virtual void dispatch( const Pointer<MessageDispatch>& message );
@@ -303,6 +307,10 @@ namespace kernels {
private:
+ // Creates a deliverable cms::Message from a received MessageDispatch, transforming if needed
+ // and configuring appropriate ack handlers.
+ Pointer<cms::Message> createCMSMessage(Pointer<commands::MessageDispatch> dispatch);
+
// Using options from the Destination URI override any settings that are
// defined for this consumer.
void applyDestinationOptions(const Pointer<commands::ConsumerInfo>& info);
View
16 activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
@@ -202,6 +202,20 @@ void ActiveMQProducerKernel::send(const cms::Destination* destination, cms::Mess
throw cms::CMSException("No destination specified", NULL);
}
+ cms::Message* outbound = message;
+ Pointer<cms::Message> scopedMessage;
+ if (this->transformer != NULL) {
+ if (this->transformer->producerTransform(this->session, this, message, &outbound)) {
+ // scopedMessage ensures that when we are responsible for the lifetime of the
+ // transformed message, the message remains valid until the send operation either
+ // succeeds or throws an exception.
+ scopedMessage.reset(outbound);
+ }
+ if (outbound == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "MessageTransformer set transformed message to NULL");
+ }
+ }
+
if (this->memoryUsage.get() != NULL) {
try {
this->memoryUsage->waitForSpace();
@@ -210,7 +224,7 @@ void ActiveMQProducerKernel::send(const cms::Destination* destination, cms::Mess
}
}
- this->session->send(this, dest, message, deliveryMode, priority, timeToLive,
+ this->session->send(this, dest, outbound, deliveryMode, priority, timeToLive,
this->memoryUsage.get(), this->sendTimeout);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
View
26 activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h
@@ -22,6 +22,7 @@
#include <cms/Message.h>
#include <cms/Destination.h>
#include <cms/DeliveryMode.h>
+#include <cms/MessageTransformer.h>
#include <activemq/util/Config.h>
#include <activemq/util/MemoryUsage.h>
@@ -79,6 +80,9 @@ namespace kernels {
// Generator of Message Sequence Id numbers for this producer.
util::LongSequenceGenerator messageSequence;
+ // Used to tranform Message before sending them to the CMS bus.
+ cms::MessageTransformer* transformer;
+
private:
ActiveMQProducerKernel(const ActiveMQProducerKernel&);
@@ -91,6 +95,8 @@ namespace kernels {
*
* @param session
* The Session which is the parent of this Producer.
+ * @param parent
+ * Pointer to the cms::MessageProducer that will wrap this kernel object.
* @param producerId
* Pointer to a ProducerId object which identifies this producer.
* @param destination
@@ -120,6 +126,26 @@ namespace kernels {
int deliveryMode, int priority, long long timeToLive);
/**
+ * Set an MessageTransformer instance that is applied to all cms::Message objects before they
+ * are sent on to the CMS bus.
+ *
+ * @param transformer
+ * Pointer to the cms::MessageTransformer to apply on each cms:;MessageSend.
+ */
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer) {
+ this->transformer = transformer;
+ }
+
+ /**
+ * Gets the currently configured MessageTransformer for this MessageProducer.
+ *
+ * @returns the pointer to the currently set cms::MessageTransformer.
+ */
+ virtual cms::MessageTransformer* getMessageTransformer() const {
+ return this->transformer;
+ }
+
+ /**
* Sets the delivery mode for this Producer
* @param mode - The DeliveryMode to use for Message sends.
*/
View
21 activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
@@ -102,10 +102,13 @@ namespace kernels{
Pointer<CloseSynhcronization> closeSync;
ConsumersMap consumers;
Mutex sendMutex;
+ cms::MessageTransformer* transformer;
public:
- SessionConfig() : synchronizationRegistered(false), producers(), scheduler(), closeSync(), consumers(), sendMutex() {}
+ SessionConfig() : synchronizationRegistered(false),
+ producers(), scheduler(), closeSync(),
+ consumers(), sendMutex(), transformer(NULL) {}
~SessionConfig() {}
};
@@ -537,6 +540,8 @@ cms::MessageConsumer* ActiveMQSessionKernel::createConsumer(const cms::Destinati
throw ex;
}
+ consumer->setMessageTransformer(this->config->transformer);
+
if (this->connection->isStarted()) {
consumer->start();
}
@@ -579,6 +584,8 @@ cms::MessageConsumer* ActiveMQSessionKernel::createDurableConsumer(const cms::To
throw ex;
}
+ consumer->setMessageTransformer(this->config->transformer);
+
if (this->connection->isStarted()) {
consumer->start();
}
@@ -627,6 +634,8 @@ cms::MessageProducer* ActiveMQSessionKernel::createProducer( const cms::Destinat
throw ex;
}
+ producer->setMessageTransformer(this->config->transformer);
+
return new ActiveMQProducer(producer);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -942,6 +951,16 @@ cms::ExceptionListener* ActiveMQSessionKernel::getExceptionListener() {
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::setMessageTransformer(cms::MessageTransformer* transformer) {
+ this->config->transformer = transformer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageTransformer* ActiveMQSessionKernel::getMessageTransformer() const {
+ return this->config->transformer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
Pointer<Scheduler> ActiveMQSessionKernel::getScheduler() const {
return this->config->scheduler;
}
View
16 activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
@@ -299,6 +299,22 @@ namespace kernels {
cms::ExceptionListener* getExceptionListener();
/**
+ * Set an MessageTransformer instance that is passed on to all MessageProducer and MessageConsumer
+ * objects created from this Session.
+ *
+ * @param transformer
+ * Pointer to the cms::MessageTransformer to set on all MessageConsumers and MessageProducers.
+ */
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer);
+
+ /**
+ * Gets the currently configured MessageTransformer for this Session.
+ *
+ * @returns the pointer to the currently set cms::MessageTransformer.
+ */
+ virtual cms::MessageTransformer* getMessageTransformer() const;
+
+ /**
* Gets the Session Information object for this session, if the
* session is closed than this method throws an exception.
* @return SessionInfo Reference
View
21 activemq-cpp/src/main/cms/Connection.h
@@ -28,6 +28,7 @@
namespace cms{
class ExceptionListener;
+ class MessageTransformer;
/**
* The client's connection to its provider.
@@ -158,6 +159,26 @@ namespace cms{
*/
virtual void setExceptionListener(ExceptionListener* listener) = 0;
+ /**
+ * Set an MessageTransformer instance that is passed on to all Session objects created from
+ * this Connection.
+ *
+ * The CMS code never takes ownership of the MessageTransformer pointer which implies that
+ * the client code must ensure that the object remains valid for the lifetime of the CMS
+ * object to which the MessageTransformer has been assigned.
+ *
+ * @param transformer
+ * Pointer to the cms::MessageTransformer to set on all newly created Session objects.
+ */
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer) = 0;
+
+ /**
+ * Gets the currently configured MessageTransformer for this Connection.
+ *
+ * @returns the pointer to the currently set cms::MessageTransformer.
+ */
+ virtual cms::MessageTransformer* getMessageTransformer() const = 0;
+
};
}
View
49 activemq-cpp/src/main/cms/ConnectionFactory.h
@@ -18,13 +18,16 @@
#define _CMS_CONNECTIONFACTORY_H_
#include <cms/Config.h>
-#include <cms/Connection.h>
#include <cms/CMSException.h>
#include <string>
namespace cms{
+ class Connection;
+ class ExceptionListener;
+ class MessageTransformer;
+
/**
* Defines the interface for a factory that creates connection objects, the Connection
* objects returned implement the CMS Connection interface and hide the CMS Provider
@@ -51,7 +54,7 @@ namespace cms{
*
* @throws CMSException if an internal error occurs while creating the Connection.
*/
- virtual Connection* createConnection() = 0;
+ virtual cms::Connection* createConnection() = 0;
/**
* Creates a connection with the default specified identity. The
@@ -96,6 +99,46 @@ namespace cms{
*/
virtual cms::Connection* createConnection(const std::string& username, const std::string& password, const std::string& clientId) = 0;
+ /**
+ * Set an ExceptionListener instance that is passed on to all Connection objects created from
+ * this ConnectionFactory
+ *
+ * @param transformer
+ * Pointer to the cms::ExceptionListener to set on all newly created Connection objects/
+ */
+ virtual void setExceptionListener(cms::ExceptionListener* listener) = 0;
+
+ /**
+ * Gets the currently configured ExceptionListener for this ConnectionFactory.
+ *
+ * The CMS code never takes ownership of the ExceptionListener pointer which implies that
+ * the client code must ensure that the object remains valid for the lifetime of the CMS
+ * object to which the ExceptionListener has been assigned.
+ *
+ * @returns the pointer to the currently set cms::ExceptionListener.
+ */
+ virtual cms::ExceptionListener* getExceptionListener() const = 0;
+
+ /**
+ * Set an MessageTransformer instance that is passed on to all Connection objects created from
+ * this ConnectionFactory
+ *
+ * The CMS code never takes ownership of the MessageTransformer pointer which implies that
+ * the client code must ensure that the object remains valid for the lifetime of the CMS
+ * object to which the MessageTransformer has been assigned.
+ *
+ * @param transformer
+ * Pointer to the cms::MessageTransformer to set on all newly created Connection objects.
+ */
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer) = 0;
+
+ /**
+ * Gets the currently configured MessageTransformer for this ConnectionFactory.
+ *
+ * @returns the pointer to the currently set cms::MessageTransformer.
+ */
+ virtual cms::MessageTransformer* getMessageTransformer() const = 0;
+
public:
/**
@@ -113,7 +156,7 @@ namespace cms{
*
* @throws CMSException if an internal error occurs while creating the ConnectionFactory.
*/
- static ConnectionFactory* createCMSConnectionFactory(const std::string& brokerURI);
+ static cms::ConnectionFactory* createCMSConnectionFactory(const std::string& brokerURI);
};
View
22 activemq-cpp/src/main/cms/MessageConsumer.h
@@ -27,6 +27,8 @@
namespace cms{
+ class MessageTransformer;
+
/**
* A client uses a <code>MessageConsumer</code> to received messages
* from a destination.
@@ -120,6 +122,26 @@ namespace cms{
*/
virtual std::string getMessageSelector() const = 0;
+ /**
+ * Set an MessageTransformer instance that is applied to all cms::Message objects before they
+ * are dispatched to client code.
+ *
+ * The CMS code never takes ownership of the MessageTransformer pointer which implies that
+ * the client code must ensure that the object remains valid for the lifetime of the CMS
+ * object to which the MessageTransformer has been assigned.
+ *
+ * @param transformer
+ * Pointer to the cms::MessageTransformer to apply on each cms:;Message dispatch.
+ */
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer) = 0;
+
+ /**
+ * Gets the currently configured MessageTransformer for this MessageConsumer.
+ *
+ * @returns the pointer to the currently set cms::MessageTransformer.
+ */
+ virtual cms::MessageTransformer* getMessageTransformer() const = 0;
+
};
}
View
22 activemq-cpp/src/main/cms/MessageProducer.h
@@ -30,6 +30,8 @@
namespace cms{
+ class MessageTransformer;
+
/**
* A client uses a <code>MessageProducer</code> object to send messages to
* a Destination. A <code>MessageProducer</code> object is created by
@@ -237,6 +239,26 @@ namespace cms{
*/
virtual long long getTimeToLive() const = 0;
+ /**
+ * Set an MessageTransformer instance that is applied to all cms::Message objects before they
+ * are sent on to the CMS bus.
+ *
+ * The CMS code never takes ownership of the MessageTransformer pointer which implies that
+ * the client code must ensure that the object remains valid for the lifetime of the CMS
+ * object to which the MessageTransformer has been assigned.
+ *
+ * @param transformer
+ * Pointer to the cms::MessageTransformer to apply on each cms:;MessageSend.
+ */
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer) = 0;
+
+ /**
+ * Gets the currently configured MessageTransformer for this MessageProducer.
+ *
+ * @returns the pointer to the currently set cms::MessageTransformer.
+ */
+ virtual cms::MessageTransformer* getMessageTransformer() const = 0;
+
};
}
View
25 activemq-cpp/src/main/cms/MessageTransformer.cpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MessageTransformer.h"
+
+using namespace cms;
+
+////////////////////////////////////////////////////////////////////////////////
+MessageTransformer::~MessageTransformer() {
+}
+
View
94 activemq-cpp/src/main/cms/MessageTransformer.h
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_MESSAGETRANSFORMER_H_
+#define _CMS_MESSAGETRANSFORMER_H_
+
+#include <cms/Config.h>
+
+namespace cms {
+
+ class MessageProducer;
+ class MessageConsumer;
+ class Message;
+ class Session;
+
+ /**
+ * Provides an interface for clients to transform cms::Message objects inside the
+ * CMS MessageProducer and MessageConsumer objects before the message's are sent or
+ * received.
+ *
+ * @since 3.0
+ */
+ class MessageTransformer {
+ public:
+
+ virtual ~MessageTransformer();
+
+ /**
+ * Transforms the given message inside the producer before it is sent to the CMS bus.
+ *
+ * The contract of this method is that the resulting transformed message pointer is set if and
+ * only if the method actually creates a new cms::Message object, otherwise it must always be
+ * set to NULL. The return value indicates whether a transformation took place and indicates
+ * that the resulting transformed cms::Message pointer will need to be deleted once the
+ * producer has sent the cms::Message on to the CMS bus.
+ *
+ * @param session
+ * The Session used to create the target MessageProducer for this transformation.
+ * @param producer
+ * The MessageProducer instance that is going to handle sending the transformed Message.
+ * @param message
+ * The CMS Message object that is to be transformed by this method.
+ * @param transformed
+ * A pointer to the location in memory where the newly transformed Message has been allocated.
+ *
+ * @returns true if the MessageProducer should handle deleting the transformed Message once sent.
+ *
+ * @throws cms::CMSException if an error occurs during the transform operation.
+ */
+ virtual bool producerTransform(cms::Session* session, cms::MessageProducer* producer, cms::Message* message, cms::Message** transformed) = 0;
+
+ /**
+ * Transforms the given message inside the consumer before it is dispatched to the client code.
+ *
+ * The contract of this method is that the resulting transformed message pointer is set if and
+ * only if the method actually creates a new cms::Message object, otherwise it must always be
+ * set to NULL. The return value indicates whether a transformation took place and indicates
+ * that the resulting transformed cms::Message pointer will need to be deleted once the
+ * consumer completed message dispatch.
+ *
+ * @param session
+ * The Session used to create the target MessageConsumer for this transformation.
+ * @param consumer
+ * The MessageConsumer instance that is going to handle dispatching the transformed Message.
+ * @param message
+ * The CMS Message object that is to be transformed by this method.
+ * @param transformed
+ * A pointer to the location in memory where the newly transformed Message has been allocated.
+ *
+ * @returns true if the MessageConsumer should handle deleting the transformed Message once sent.
+ *
+ * @throws cms::CMSException if an error occurs during the transform operation.
+ */
+ virtual bool consumerTransform(cms::Session* session, cms::MessageConsumer* consumer, cms::Message* message, cms::Message** transformed) = 0;
+
+ };
+
+}
+
+#endif /* _CMS_MESSAGETRANSFORMER_H_ */
View
22 activemq-cpp/src/main/cms/Session.h
@@ -37,6 +37,8 @@
namespace cms{
+ class MessageTransformer;
+
/**
* A Session object is a single-threaded context for producing and consuming
* messages.
@@ -443,6 +445,26 @@ namespace cms{
*/
virtual void unsubscribe(const std::string& name) = 0;
+ /**
+ * Set an MessageTransformer instance that is passed on to all MessageProducer and MessageConsumer
+ * objects created from this Session.
+ *
+ * The CMS code never takes ownership of the MessageTransformer pointer which implies that
+ * the client code must ensure that the object remains valid for the lifetime of the CMS
+ * object to which the MessageTransformer has been assigned.
+ *
+ * @param transformer
+ * Pointer to the cms::MessageTransformer to set on all MessageConsumers and MessageProducers.
+ */
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer) = 0;
+
+ /**
+ * Gets the currently configured MessageTransformer for this Session.
+ *
+ * @returns the pointer to the currently set cms::MessageTransformer.
+ */
+ virtual cms::MessageTransformer* getMessageTransformer() const = 0;
+
};
}
View
25 activemq-cpp/src/test/activemq/cmsutil/DummyConnection.h
@@ -26,19 +26,21 @@ namespace cmsutil {
class MessageContext;
- class DummyConnection : public cms::Connection {
+ class DummyConnection: public cms::Connection {
private:
cms::ExceptionListener* listener;
+ cms::MessageTransformer* transformer;
std::string clientId;
MessageContext* messageContext;
public:
- DummyConnection(MessageContext* messageContext ) {
+ DummyConnection(MessageContext* messageContext) {
this->messageContext = messageContext;
}
- virtual ~DummyConnection() {}
+ virtual ~DummyConnection() {
+ }
virtual const cms::ConnectionMetaData* getMetaData() const {
return NULL;
@@ -53,12 +55,11 @@ namespace cmsutil {
virtual void stop() {
}
- virtual cms::Session* createSession() throw ( cms::CMSException ) {
+ virtual cms::Session* createSession() throw (cms::CMSException) {
return new DummySession(messageContext);
}
- virtual cms::Session* createSession( cms::Session::AcknowledgeMode ackMode )
- throw ( cms::CMSException ) {
+ virtual cms::Session* createSession(cms::Session::AcknowledgeMode ackMode) throw (cms::CMSException) {
DummySession* s = new DummySession(messageContext);
s->setAcknowledgeMode(ackMode);
@@ -69,17 +70,25 @@ namespace cmsutil {
return clientId;
}
- virtual void setClientID( const std::string& id ) {
+ virtual void setClientID(const std::string& id) {
this->clientId = id;
}
virtual cms::ExceptionListener* getExceptionListener() const {
return listener;
}
- virtual void setExceptionListener( cms::ExceptionListener* listener ) {
+ virtual void setExceptionListener(cms::ExceptionListener* listener) {
this->listener = listener;
}
+
+ virtual cms::MessageTransformer* getMessageTransformer() const {
+ return transformer;
+ }
+
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer) {
+ this->transformer = transformer;
+ }
};
}}
View
32 activemq-cpp/src/test/activemq/cmsutil/DummyConnectionFactory.h
@@ -29,38 +29,46 @@ namespace cmsutil {
private:
MessageContext messageContext;
+ cms::ExceptionListener* listener;
+ cms::MessageTransformer* transformer;
public:
virtual ~DummyConnectionFactory() {}
- virtual cms::Connection* createConnection() throw ( cms::CMSException ) {
-
+ virtual cms::Connection* createConnection() {
return new DummyConnection(&messageContext);
}
- virtual cms::Connection* createConnection( const std::string& username,
- const std::string& password )
- throw ( cms::CMSException ) {
-
+ virtual cms::Connection* createConnection(const std::string& username, const std::string& password) {
return new DummyConnection(&messageContext);
}
- virtual cms::Connection* createConnection( const std::string& username,
- const std::string& password,
- const std::string& clientId )
- throw ( cms::CMSException ) {
-
+ virtual cms::Connection* createConnection(const std::string& username, const std::string& password, const std::string& clientId) {
DummyConnection* c = new DummyConnection(&messageContext);
c->setClientID(clientId);
-
return c;
}
MessageContext* getMessageContext() {
return &messageContext;
}
+ virtual cms::ExceptionListener* getExceptionListener() const {
+ return listener;
+ }
+
+ virtual void setExceptionListener(cms::ExceptionListener* listener) {
+ this->listener = listener;
+ }
+
+ virtual cms::MessageTransformer* getMessageTransformer() const {
+ return transformer;
+ }
+
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer) {
+ this->transformer = transformer;
+ }
};
}}
View
11 activemq-cpp/src/test/activemq/cmsutil/DummyConsumer.h
@@ -26,8 +26,10 @@ namespace cmsutil {
class DummyConsumer : public cms::MessageConsumer {
private:
+
std::string selector;
cms::MessageListener* listener;
+ cms::MessageTransformer* transformer;
MessageContext* messageContext;
const cms::Destination* dest;
bool noLocal;
@@ -40,6 +42,7 @@ namespace cmsutil {
this->noLocal = noLocal;
this->dest = dest;
this->listener = NULL;
+ this->transformer = NULL;
}
virtual ~DummyConsumer() {}
@@ -74,6 +77,14 @@ namespace cmsutil {
return selector;
}
+ virtual cms::MessageTransformer* getMessageTransformer() const {
+ return transformer;
+ }
+
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer) {
+ this->transformer = transformer;
+ }
+
};
}}
View
59 activemq-cpp/src/test/activemq/cmsutil/DummyProducer.h
@@ -26,28 +26,33 @@ namespace cmsutil {
class DummyProducer : public cms::MessageProducer {
private:
+
const cms::Destination* dest;
int deliveryMode;
bool disableMessageId;
bool disableMessageTimestamp;
int priority;
long long ttl;
MessageContext* messageContext;
+ cms::MessageTransformer* transformer;
public:
DummyProducer(MessageContext* messageContext, const cms::Destination* dest) {
- deliveryMode = 1;
- disableMessageId = false;
- disableMessageTimestamp = false;
- priority = 4;
- ttl = 0L;
+ this->deliveryMode = 1;
+ this->disableMessageId = false;
+ this->disableMessageTimestamp = false;
+ this->priority = 4;
+ this->ttl = 0L;
this->dest = dest;
this->messageContext = messageContext;
+ this->transformer = NULL;
+ }
+ virtual ~DummyProducer() {
}
- virtual ~DummyProducer() {}
- virtual void close() {}
+ virtual void close() {
+ }
/**
* Sends the message to the default producer destination, but does
@@ -58,7 +63,7 @@ namespace cmsutil {
* The message to be sent.
* @throws cms::CMSException
*/
- virtual void send( cms::Message* message ) throw ( cms::CMSException ){
+ virtual void send(cms::Message* message) throw (cms::CMSException) {
send(message, deliveryMode, priority, ttl);
}
@@ -76,8 +81,7 @@ namespace cmsutil {
* The time to live value for this message in milliseconds.
* @throws cms::CMSException
*/
- virtual void send( cms::Message* message, int deliveryMode, int priority,
- long long timeToLive) throw ( cms::CMSException ){
+ virtual void send(cms::Message* message, int deliveryMode, int priority, long long timeToLive) throw (cms::CMSException) {
send(dest, message, deliveryMode, priority, timeToLive);
}
@@ -93,8 +97,7 @@ namespace cmsutil {
* the message to be sent.
* @throws cms::CMSException
*/
- virtual void send( const cms::Destination* destination,
- cms::Message* message ) throw ( cms::CMSException ){
+ virtual void send(const cms::Destination* destination, cms::Message* message) throw (cms::CMSException) {
send(dest, message, deliveryMode, priority, ttl);
}
@@ -114,9 +117,8 @@ namespace cmsutil {
* The time to live value for this message in milliseconds.
* @throws cms::CMSException
*/
- virtual void send( const cms::Destination* destination,
- cms::Message* message, int deliveryMode, int priority,
- long long timeToLive) throw ( cms::CMSException ){
+ virtual void send(const cms::Destination* destination, cms::Message* message, int deliveryMode, int priority, long long timeToLive)
+ throw (cms::CMSException) {
messageContext->send(destination, message, deliveryMode, priority, timeToLive);
}
@@ -127,7 +129,7 @@ namespace cmsutil {
* @param mode
* The DeliveryMode
*/
- virtual void setDeliveryMode( int mode ) throw ( cms::CMSException ) {
+ virtual void setDeliveryMode(int mode) throw (cms::CMSException) {
this->deliveryMode = mode;
}
@@ -136,7 +138,7 @@ namespace cmsutil {
*
* @return The DeliveryMode
*/
- virtual int getDeliveryMode() const throw ( cms::CMSException ) {
+ virtual int getDeliveryMode() const throw (cms::CMSException) {
return deliveryMode;
}
@@ -146,7 +148,7 @@ namespace cmsutil {
* @param value
* boolean indicating enable / disable (true / false)
*/
- virtual void setDisableMessageID( bool value ) throw ( cms::CMSException ) {
+ virtual void setDisableMessageID(bool value) throw (cms::CMSException) {
disableMessageId = value;
}
@@ -155,15 +157,15 @@ namespace cmsutil {
*
* @return boolean indicating enable / disable (true / false)
*/
- virtual bool getDisableMessageID() const throw ( cms::CMSException ) {
+ virtual bool getDisableMessageID() const throw (cms::CMSException) {
return disableMessageId;
}
/**
* Sets if Message Time Stamps are disbled for this Producer
* @param value - boolean indicating enable / disable (true / false)
*/
- virtual void setDisableMessageTimeStamp( bool value ) throw ( cms::CMSException ) {
+ virtual void setDisableMessageTimeStamp(bool value) throw (cms::CMSException) {
disableMessageTimestamp = value;
}
@@ -172,7 +174,7 @@ namespace cmsutil {
*
* @return boolean indicating enable / disable (true / false)
*/
- virtual bool getDisableMessageTimeStamp() const throw ( cms::CMSException ) {
+ virtual bool getDisableMessageTimeStamp() const throw (cms::CMSException) {
return disableMessageTimestamp;
}
@@ -182,7 +184,7 @@ namespace cmsutil {
* @param priority
* int value for Priority level
*/
- virtual void setPriority( int priority ) throw ( cms::CMSException ) {
+ virtual void setPriority(int priority) throw (cms::CMSException) {
this->priority = priority;
}
@@ -191,7 +193,7 @@ namespace cmsutil {
*
* @return int based priority level
*/
- virtual int getPriority() const throw ( cms::CMSException ) {
+ virtual int getPriority() const throw (cms::CMSException) {
return priority;
}
@@ -203,7 +205,7 @@ namespace cmsutil {
* @param time
* default time to live value in milliseconds
*/
- virtual void setTimeToLive( long long time ) throw ( cms::CMSException ) {
+ virtual void setTimeToLive(long long time) throw (cms::CMSException) {
ttl = time;
}
@@ -212,10 +214,17 @@ namespace cmsutil {
*
* @return Time to live value in milliseconds
*/
- virtual long long getTimeToLive() const throw ( cms::CMSException ) {
+ virtual long long getTimeToLive() const throw (cms::CMSException) {
return ttl;
}
+ virtual cms::MessageTransformer* getMessageTransformer() const {
+ return transformer;
+ }
+
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer) {
+ this->transformer = transformer;
+ }
};
}}
View
125 activemq-cpp/src/test/activemq/cmsutil/DummySession.h
@@ -35,101 +35,110 @@ namespace cmsutil {
cms::Session::AcknowledgeMode mode;
MessageContext* messageContext;
+ cms::MessageTransformer* transformer;
public:
DummySession(MessageContext* messageContext) {
this->mode = cms::Session::AUTO_ACKNOWLEDGE;
this->messageContext = messageContext;
+ this->transformer = NULL;
}
- virtual ~DummySession() {}
+ virtual ~DummySession() {
+ }
- virtual void close() {}
+ virtual void close() {
+ }
- virtual void start() {}
+ virtual void start() {
+ }
- virtual void stop() {}
+ virtual void stop() {
+ }
- virtual void commit() {}
+ virtual void commit() {
+ }
- virtual void rollback() {}
+ virtual void rollback() {
+ }
- virtual void recover() {}
+ virtual void recover() {
+ }
- virtual cms::MessageConsumer* createConsumer(
- const cms::Destination* destination ) {
+ virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination) {
return new DummyConsumer(messageContext, destination, "", false);
}
- virtual cms::MessageConsumer* createConsumer(
- const cms::Destination* destination,
- const std::string& selector )
- throw ( cms::CMSException ) {
+ virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination, const std::string& selector) throw (cms::CMSException) {
return new DummyConsumer(messageContext, destination, selector, false);
}
- virtual cms::MessageConsumer* createConsumer(
- const cms::Destination* destination,
- const std::string& selector,
- bool noLocal )
- throw ( cms::CMSException ) {
+ virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination, const std::string& selector, bool noLocal) throw (cms::CMSException) {
return new DummyConsumer(messageContext, destination, selector, noLocal);
}
- virtual cms::MessageConsumer* createDurableConsumer(
- const cms::Topic* destination,
- const std::string& name,
- const std::string& selector,
- bool noLocal = false ) { return NULL; }
+ virtual cms::MessageConsumer* createDurableConsumer(const cms::Topic* destination, const std::string& name, const std::string& selector,
+ bool noLocal = false) {
+ return NULL;
+ }
- virtual cms::MessageProducer* createProducer( const cms::Destination* destination )
- { return new DummyProducer(messageContext, destination); }
+ virtual cms::MessageProducer* createProducer(const cms::Destination* destination) {
+ return new DummyProducer(messageContext, destination);
+ }
- virtual cms::QueueBrowser* createBrowser( const cms::Queue* queue )
- { return NULL; }
+ virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue) {
+ return NULL;
+ }
- virtual cms::QueueBrowser* createBrowser( const cms::Queue* queue, const std::string& selector )
- { return NULL; }
+ virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue, const std::string& selector) {
+ return NULL;
+ }
- virtual cms::Queue* createQueue( const std::string& queueName )
- {
- return new activemq::commands::ActiveMQQueue( queueName );
+ virtual cms::Queue* createQueue(const std::string& queueName) {
+ return new activemq::commands::ActiveMQQueue(queueName);
}
- virtual cms::Topic* createTopic( const std::string& topicName ) {
- return new activemq::commands::ActiveMQTopic( topicName );
+ virtual cms::Topic* createTopic(const std::string& topicName) {
+ return new activemq::commands::ActiveMQTopic(topicName);
}
- virtual cms::TemporaryQueue* createTemporaryQueue()
- { return NULL; }
+ virtual cms::TemporaryQueue* createTemporaryQueue() {
+ return NULL;
+ }
- virtual cms::TemporaryTopic* createTemporaryTopic()
- { return NULL; }
+ virtual cms::TemporaryTopic* createTemporaryTopic() {
+ return NULL;
+ }
- virtual cms::Message* createMessage()
- { return NULL; }
+ virtual cms::Message* createMessage() {
+ return NULL;
+ }
- virtual cms::BytesMessage* createBytesMessage()
- { return NULL; }
+ virtual cms::BytesMessage* createBytesMessage() {
+ return NULL;
+ }
- virtual cms::BytesMessage* createBytesMessage(
- const unsigned char* bytes, int bytesSize ) {
+ virtual cms::BytesMessage* createBytesMessage(const unsigned char* bytes, int bytesSize) {
return NULL;
}
- virtual cms::StreamMessage* createStreamMessage()
- throw ( cms::CMSException ){ return NULL; }
+ virtual cms::StreamMessage* createStreamMessage() throw (cms::CMSException) {
+ return NULL;
+ }
- virtual cms::TextMessage* createTextMessage()
- throw ( cms::CMSException ){ return NULL; }
+ virtual cms::TextMessage* createTextMessage() throw (cms::CMSException) {
+ return NULL;
+ }
- virtual cms::TextMessage* createTextMessage( const std::string& text )
- throw ( cms::CMSException ){ return NULL; }
+ virtual cms::TextMessage* createTextMessage(const std::string& text) throw (cms::CMSException) {
+ return NULL;
+ }
- virtual cms::MapMessage* createMapMessage()
- throw ( cms::CMSException ){ return NULL; }
+ virtual cms::MapMessage* createMapMessage() throw (cms::CMSException) {
+ return NULL;
+ }
virtual cms::Session::AcknowledgeMode getAcknowledgeMode() const {
return mode;
@@ -140,11 +149,19 @@ namespace cmsutil {
}
virtual bool isTransacted() const {
- return mode==cms::Session::SESSION_TRANSACTED;
+ return mode == cms::Session::SESSION_TRANSACTED;
}
- virtual void unsubscribe( const std::string& name ) {}
+ virtual void unsubscribe(const std::string& name) {
+ }
+
+ virtual cms::MessageTransformer* getMessageTransformer() const {
+ return transformer;
+ }
+ virtual void setMessageTransformer(cms::MessageTransformer* transformer) {
+ this->transformer = transformer;
+ }
};
}}

0 comments on commit b528e8a

Please sign in to comment.