Permalink
Browse files

https://issues.apache.org/jira/browse/AMQCPP-367

Adds ConnectionAudit for use in dup detection and some more tests. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/activemq-cpp/trunk@1461355 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent 502a550 commit c981a4c2f9093a934a702edc074870e4386ea5bb Timothy A. Bish committed Mar 26, 2013
@@ -106,6 +106,7 @@ cc_sources = \
activemq/core/ActiveMQXAConnectionFactory.cpp \
activemq/core/ActiveMQXASession.cpp \
activemq/core/AdvisoryConsumer.cpp \
+ activemq/core/ConnectionAudit.cpp \
activemq/core/DispatchData.cpp \
activemq/core/Dispatcher.cpp \
activemq/core/FifoMessageDispatchChannel.cpp \
@@ -743,6 +744,7 @@ h_sources = \
activemq/core/ActiveMQXAConnectionFactory.h \
activemq/core/ActiveMQXASession.h \
activemq/core/AdvisoryConsumer.h \
+ activemq/core/ConnectionAudit.h \
activemq/core/DispatchData.h \
activemq/core/Dispatcher.h \
activemq/core/FifoMessageDispatchChannel.h \
@@ -23,6 +23,7 @@
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/core/ActiveMQConnectionMetaData.h>
#include <activemq/core/AdvisoryConsumer.h>
+#include <activemq/core/ConnectionAudit.h>
#include <activemq/core/kernels/ActiveMQSessionKernel.h>
#include <activemq/core/kernels/ActiveMQProducerKernel.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
@@ -195,6 +196,8 @@ namespace core{
TempDestinationMap activeTempDestinations;
+ ConnectionAudit connectionAudit;
+
ConnectionConfig(const Pointer<transport::Transport> transport,
const Pointer<decaf::util::Properties> properties) :
properties(properties),
@@ -454,6 +457,8 @@ ActiveMQConnection::ActiveMQConnection(const Pointer<transport::Transport> trans
configuration->connectionInfo->setManageable(true);
configuration->connectionInfo->setFaultTolerant(transport->isFaultTolerant());
+ configuration->connectionAudit.setCheckForDuplicates(transport->isFaultTolerant());
+
this->config = configuration.release();
}
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConnectionAudit.h"
+
+#include <decaf/util/LinkedHashMap.h>
+
+#include <activemq/core/ActiveMQMessageAudit.h>
+#include <activemq/commands/ActiveMQDestination.h>
+
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
+
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq {
+namespace core {
+
+ class ConnectionAuditImpl {
+ private:
+
+ ConnectionAuditImpl(const ConnectionAuditImpl&);
+ ConnectionAuditImpl& operator= (const ConnectionAuditImpl&);
+
+ public:
+
+ Mutex mutex;
+ LinkedHashMap<Pointer<ActiveMQDestination>, Pointer<ActiveMQMessageAudit> > destinations;
+ LinkedHashMap<Pointer<Dispatcher>, Pointer<ActiveMQMessageAudit> > dispatchers;
+
+ ConnectionAuditImpl() : mutex(), destinations(1000), dispatchers(1000) {
+ }
+ };
+}}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAudit::ConnectionAudit() : impl(new ConnectionAuditImpl),
+ checkForDuplicates(true),
+ auditDepth(ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE),
+ auditMaximumProducerNumber(ActiveMQMessageAudit::MAXIMUM_PRODUCER_COUNT) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAudit::ConnectionAudit(int auditDepth, int maxProducers) :
+ impl(new ConnectionAuditImpl),
+ checkForDuplicates(true),
+ auditDepth(auditDepth),
+ auditMaximumProducerNumber(maxProducers) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAudit::~ConnectionAudit() {
+ try {
+ delete this->impl;
+ }
+ AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAudit::removeDispatcher(Pointer<Dispatcher> dispatcher) {
+ synchronized(&this->impl->mutex) {
+ this->impl->dispatchers.remove(dispatcher);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ConnectionAudit::isDuplicate(Pointer<Dispatcher> dispatcher, Pointer<commands::Message> message) {
+
+ if (checkForDuplicates && message != NULL) {
+ Pointer<ActiveMQDestination> destination = message->getDestination();
+ if (destination != NULL) {
+ if (destination->isQueue()) {
+ Pointer<ActiveMQMessageAudit> audit;
+ try {
+ audit = this->impl->destinations.get(destination);
+ } catch (NoSuchElementException& ex) {
+ audit.reset(new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber));
+ this->impl->destinations.put(destination, audit);
+ }
+ bool result = audit->isDuplicate(message->getMessageId());
+ return result;
+ }
+ Pointer<ActiveMQMessageAudit> audit;
+ try {
+ audit = this->impl->dispatchers.get(dispatcher);
+ } catch (NoSuchElementException& ex) {
+ audit.reset(new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber));
+ this->impl->dispatchers.put(dispatcher, audit);
+ }
+ bool result = audit->isDuplicate(message->getMessageId());
+ return result;
+ }
+ }
+ return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAudit::rollbackDuplicate(Pointer<Dispatcher> dispatcher, Pointer<commands::Message> message) {
+ if (checkForDuplicates && message != NULL) {
+ Pointer<ActiveMQDestination> destination = message->getDestination();
+ if (destination != NULL) {
+ if (destination->isQueue()) {
+ try {
+ Pointer<ActiveMQMessageAudit> audit = this->impl->destinations.get(destination);
+ audit->rollback(message->getMessageId());
+ } catch (NoSuchElementException& ex) {}
+ } else {
+ try {
+ Pointer<ActiveMQMessageAudit> audit = this->impl->dispatchers.get(dispatcher);
+ audit->rollback(message->getMessageId());
+ } catch (NoSuchElementException& ex) {}
+ }
+ }
+ }
+}
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_CONNECTIONAUDIT_H_
+#define _ACTIVEMQ_CORE_CONNECTIONAUDIT_H_
+
+#include <activemq/util/Config.h>
+
+#include <activemq/commands/Message.h>
+#include <activemq/core/Dispatcher.h>
+
+namespace activemq {
+namespace core {
+
+ class ConnectionAuditImpl;
+
+ /**
+ * Provides the Auditing functionality used by Connections to attempt to
+ * filter out duplicate Messages.
+ *
+ * @since 3.7.0
+ */
+ class AMQCPP_API ConnectionAudit {
+ private:
+
+ ConnectionAudit(const ConnectionAudit&);
+ ConnectionAudit& operator= (const ConnectionAudit&);
+
+ private:
+
+ ConnectionAuditImpl* impl;
+
+ bool checkForDuplicates;
+ int auditDepth;
+ int auditMaximumProducerNumber;
+
+ public:
+
+ ConnectionAudit();
+
+ ConnectionAudit(int auditDepth, int maxProducers);
+
+ ~ConnectionAudit();
+
+ public:
+
+ void removeDispatcher(Pointer<Dispatcher> dispatcher);
+
+ bool isDuplicate(Pointer<Dispatcher> dispatcher, Pointer<commands::Message> message);
+
+ void rollbackDuplicate(Pointer<Dispatcher> dispatcher, Pointer<commands::Message> message);
+
+ public:
+
+ bool isCheckForDuplicates() const {
+ return this->checkForDuplicates;
+ }
+
+ void setCheckForDuplicates(bool checkForDuplicates) {
+ this->checkForDuplicates = checkForDuplicates;
+ }
+
+ int getAuditDepth() {
+ return auditDepth;
+ }
+
+ void setAuditDepth(int auditDepth) {
+ this->auditDepth = auditDepth;
+ }
+
+ int getAuditMaximumProducerNumber() {
+ return auditMaximumProducerNumber;
+ }
+
+ void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
+ this->auditMaximumProducerNumber = auditMaximumProducerNumber;
+ }
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_CONNECTIONAUDIT_H_ */
@@ -38,6 +38,7 @@ cc_sources = \
activemq/core/ActiveMQConnectionTest.cpp \
activemq/core/ActiveMQMessageAuditTest.cpp \
activemq/core/ActiveMQSessionTest.cpp \
+ activemq/core/ConnectionAuditTest.cpp \
activemq/core/FifoMessageDispatchChannelTest.cpp \
activemq/core/SimplePriorityMessageDispatchChannelTest.cpp \
activemq/exceptions/ActiveMQExceptionTest.cpp \
@@ -286,6 +287,7 @@ h_sources = \
activemq/core/ActiveMQConnectionTest.h \
activemq/core/ActiveMQMessageAuditTest.h \
activemq/core/ActiveMQSessionTest.h \
+ activemq/core/ConnectionAuditTest.h \
activemq/core/FifoMessageDispatchChannelTest.h \
activemq/core/SimplePriorityMessageDispatchChannelTest.h \
activemq/exceptions/ActiveMQExceptionTest.h \
@@ -88,6 +88,58 @@ void ActiveMQMessageAuditTest::testIsDuplicateMessageId() {
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testRollbackString() {
+
+ int count = 10000;
+ ActiveMQMessageAudit audit;
+ IdGenerator idGen;
+
+ ArrayList<std::string> list;
+ for (int i = 0; i < count; i++) {
+ std::string id = idGen.generateId();
+ list.add(id);
+ CPPUNIT_ASSERT(!audit.isDuplicate(id));
+ }
+
+ int index = list.size() -1 -audit.getAuditDepth();
+ for (; index < list.size(); index++) {
+ std::string id = list.get(index);
+ CPPUNIT_ASSERT_MESSAGE("duplicate, id:" + id, audit.isDuplicate(id));
+ audit.rollback(id);
+ CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id, !audit.isDuplicate(id));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testRollbackMessageId() {
+
+ int count = 10000;
+ ActiveMQMessageAudit audit;
+ ArrayList<Pointer<MessageId> > list;
+
+ Pointer<ProducerId> pid(new ProducerId);
+ pid->setConnectionId("test");
+ pid->setSessionId(0);
+ pid->setValue(1);
+
+ for (int i = 0; i < count; i++) {
+ Pointer<MessageId> id(new MessageId);
+ id->setProducerId(pid);
+ id->setProducerSequenceId(i);
+ list.add(id);
+ CPPUNIT_ASSERT(!audit.isDuplicate(id));
+ }
+
+ int index = list.size() -1 -audit.getAuditDepth();
+ for (; index < list.size(); index++) {
+ Pointer<MessageId> id = list.get(index);
+ CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(), audit.isDuplicate(id));
+ audit.rollback(id);
+ CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(), !audit.isDuplicate(id));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQMessageAuditTest::testIsInOrderString() {
int count = 10000;
@@ -31,6 +31,8 @@ namespace core {
CPPUNIT_TEST( testIsDuplicateMessageId );
CPPUNIT_TEST( testIsInOrderString );
CPPUNIT_TEST( testIsInOrderMessageId );
+ CPPUNIT_TEST( testRollbackString );
+ CPPUNIT_TEST( testRollbackMessageId );
CPPUNIT_TEST( testGetLastSeqId );
CPPUNIT_TEST_SUITE_END();
@@ -43,6 +45,8 @@ namespace core {
void testIsDuplicateMessageId();
void testIsInOrderString();
void testIsInOrderMessageId();
+ void testRollbackString();
+ void testRollbackMessageId();
void testGetLastSeqId();
};
Oops, something went wrong.

0 comments on commit c981a4c

Please sign in to comment.