From 134ae96ff4553221120f3ba7c3261b28716be3b7 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Thu, 21 May 2015 19:47:22 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5792 Adding SubscriptionStatistics to group together all metrics in a bean for a subscription. --- .../broker/region/AbstractSubscription.java | 17 +-- .../region/DurableTopicSubscription.java | 6 +- .../broker/region/PrefetchSubscription.java | 35 +++--- .../activemq/broker/region/Subscription.java | 2 + .../broker/region/SubscriptionStatistics.java | 101 ++++++++++++++++++ .../broker/region/TopicSubscription.java | 39 ++++--- .../region/QueueDuplicatesFromStoreTest.java | 10 +- .../SubscriptionAddRemoveQueueTest.java | 8 +- 8 files changed, 167 insertions(+), 51 deletions(-) create mode 100644 activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index 1ed2fae1cd8..37056a2f0f3 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -53,7 +53,7 @@ public abstract class AbstractSubscription implements Subscription { private int cursorMemoryHighWaterMark = 70; private boolean slowConsumer; private long lastAckTime; - private AtomicLong consumedCount = new AtomicLong(); + private final SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics(); public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { this.broker = broker; @@ -89,7 +89,7 @@ private static BooleanExpression parseSelector(ConsumerInfo info) throws Invalid @Override public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { this.lastAckTime = System.currentTimeMillis(); - this.consumedCount.incrementAndGet(); + subscriptionStatistics.getConsumedCount().increment(); } @Override @@ -230,7 +230,7 @@ public boolean isBrowser() { @Override public int getInFlightUsage() { if (info.getPrefetchSize() > 0) { - return (getInFlightSize() * 100)/info.getPrefetchSize(); + return (getInFlightSize() * 100)/info.getPrefetchSize(); } return Integer.MAX_VALUE; } @@ -285,14 +285,19 @@ public void setTimeOfLastMessageAck(long value) { } public long getConsumedCount(){ - return consumedCount.get(); + return subscriptionStatistics.getConsumedCount().getCount(); } public void incrementConsumedCount(){ - consumedCount.incrementAndGet(); + subscriptionStatistics.getConsumedCount().increment(); } public void resetConsumedCount(){ - consumedCount.set(0); + subscriptionStatistics.getConsumedCount().reset(); + } + + @Override + public SubscriptionStatistics getSubscriptionStatistics() { + return subscriptionStatistics; } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 91e14f03ce3..d87bd5403f3 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -121,11 +121,11 @@ public void add(ConnectionContext context, Destination destination) throws Excep if (active.get() || keepDurableSubsActive) { Topic topic = (Topic) destination; topic.activate(context, this); - this.enqueueCounter += pending.size(); + getSubscriptionStatistics().getEnqueues().add(pending.size()); } else if (destination.getMessageStore() != null) { TopicMessageStore store = (TopicMessageStore) destination.getMessageStore(); try { - this.enqueueCounter += store.getMessageCount(subscriptionKey.getClientId(), subscriptionKey.getSubscriptionName()); + getSubscriptionStatistics().getEnqueues().add(store.getMessageCount(subscriptionKey.getClientId(), subscriptionKey.getSubscriptionName())); } catch (IOException e) { JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store " + e); jmsEx.setLinkedException(e); @@ -325,7 +325,7 @@ protected void acknowledge(ConnectionContext context, MessageAck ack, MessageRef @Override public synchronized String toString() { return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" - + durableDestinations.size() + ", total=" + enqueueCounter + ", pending=" + getPendingQueueSize() + ", dispatched=" + dispatchCounter + + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 0bbd000ce90..53a00c9ff3a 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -58,9 +58,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { protected final List dispatched = new ArrayList(); protected final AtomicInteger prefetchExtension = new AtomicInteger(); protected boolean usePrefetchExtension = true; - protected long enqueueCounter; - protected long dispatchCounter; - protected long dequeueCounter; private int maxProducersToAudit=32; private int maxAuditDepth=2048; protected final SystemUsage usageManager; @@ -94,7 +91,7 @@ public Response pullMessage(ConnectionContext context, final MessagePull pull) t // consumers to 'wake them up' in case they were waiting for a message. if (getPrefetchSize() == 0) { prefetchExtension.set(pull.getQuantity()); - final long dispatchCounterBeforePull = dispatchCounter; + final long dispatchCounterBeforePull = getSubscriptionStatistics().getDispatched().getCount(); // Have the destination push us some messages. for (Destination dest : destinations) { @@ -104,7 +101,7 @@ public Response pullMessage(ConnectionContext context, final MessagePull pull) t synchronized(this) { // If there was nothing dispatched.. we may need to setup a timeout. - if (dispatchCounterBeforePull == dispatchCounter || pull.isAlwaysSignalDone()) { + if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) { // immediate timeout used by receiveNoWait() if (pull.getTimeout() == -1) { // Null message indicates the pull is done or did not have pending. @@ -132,7 +129,7 @@ public void run() { */ final void pullTimeout(long dispatchCounterBeforePull, boolean alwaysSignalDone) { synchronized (pendingLock) { - if (dispatchCounterBeforePull == dispatchCounter || alwaysSignalDone) { + if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || alwaysSignalDone) { try { prefetchExtension.set(1); add(QueueMessageReference.NULL_MESSAGE); @@ -157,7 +154,7 @@ public void add(MessageReference node) throws Exception { // Don't increment for the pullTimeout control message. if (!node.equals(QueueMessageReference.NULL_MESSAGE)) { - enqueueCounter++; + getSubscriptionStatistics().getEnqueues().increment(); } pending.addMessageLast(node); } @@ -227,7 +224,7 @@ public final void acknowledge(final ConnectionContext context,final MessageAck a if (inAckRange) { // Don't remove the nodes until we are committed. if (!context.isInTransaction()) { - dequeueCounter++; + getSubscriptionStatistics().getDequeues().increment(); ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); removeList.add(node); } else { @@ -257,7 +254,7 @@ public final void acknowledge(final ConnectionContext context,final MessageAck a if (ack.getLastMessageId().equals(messageId)) { // Don't remove the nodes until we are committed - immediateAck option if (!context.isInTransaction()) { - dequeueCounter++; + getSubscriptionStatistics().getDequeues().increment(); ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); dispatched.remove(node); } else { @@ -361,9 +358,9 @@ public final void acknowledge(final ConnectionContext context,final MessageAck a sendToDLQ(context, node, ack.getPoisonCause()); Destination nodeDest = (Destination) node.getRegionDestination(); nodeDest.getDestinationStatistics() - .getInflight().decrement(); + .getInflight().decrement(); removeList.add(node); - dequeueCounter++; + getSubscriptionStatistics().getDequeues().increment(); index++; acknowledge(context, ack, node); if (ack.getLastMessageId().equals(messageId)) { @@ -428,7 +425,7 @@ public void afterCommit() throws Exception { Destination nodeDest = (Destination) node.getRegionDestination(); synchronized(dispatchLock) { - dequeueCounter++; + getSubscriptionStatistics().getDequeues().increment(); dispatched.remove(node); nodeDest.getDestinationStatistics().getInflight().decrement(); } @@ -550,17 +547,17 @@ public int getDispatchedQueueSize() { @Override public long getDequeueCounter() { - return dequeueCounter; + return getSubscriptionStatistics().getDequeues().getCount(); } @Override public long getDispatchedCounter() { - return dispatchCounter; + return getSubscriptionStatistics().getDispatched().getCount(); } @Override public long getEnqueueCounter() { - return enqueueCounter; + return getSubscriptionStatistics().getEnqueues().getCount(); } @Override @@ -632,7 +629,7 @@ private void updateDestinationStats(List rc, Destination desti // made public so it can be used in MQTTProtocolConverter public void dispatchPending() throws IOException { - synchronized(pendingLock) { + synchronized(pendingLock) { try { int numberToDispatch = countBeforeFull(); if (numberToDispatch > 0) { @@ -695,7 +692,7 @@ protected boolean dispatch(final MessageReference node) throws IOException { MessageDispatch md = createMessageDispatch(node, message); if (node != QueueMessageReference.NULL_MESSAGE) { - dispatchCounter++; + getSubscriptionStatistics().getDispatched().increment(); dispatched.add(node); } if (getPrefetchSize() == 0) { @@ -724,7 +721,7 @@ public void onFailure() { if (node != QueueMessageReference.NULL_MESSAGE) { nodeDest.getDestinationStatistics().getDispatched().increment(); nodeDest.getDestinationStatistics().getInflight().increment(); - LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), dispatchCounter, dispatched.size() }); + LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() }); } } if (node instanceof QueueMessageReference) { @@ -746,7 +743,7 @@ protected void onDispatch(final MessageReference node, final Message message) { if (node != QueueMessageReference.NULL_MESSAGE) { nodeDest.getDestinationStatistics().getDispatched().increment(); nodeDest.getDestinationStatistics().getInflight().increment(); - LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), dispatchCounter, dispatched.size() }); + LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() }); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java index ee0e2188fa5..ec37512b431 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java @@ -137,6 +137,8 @@ public interface Subscription extends SubscriptionRecovery { */ long getDequeueCounter(); + SubscriptionStatistics getSubscriptionStatistics(); + /** * @return the JMS selector on the current subscription */ diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java new file mode 100644 index 00000000000..09fab8a0fd4 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/SubscriptionStatistics.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region; + +import org.apache.activemq.management.CountStatisticImpl; +import org.apache.activemq.management.StatsImpl; + +/** + * The J2EE Statistics for a Subsription. + */ +public class SubscriptionStatistics extends StatsImpl { + + protected CountStatisticImpl consumedCount; + protected CountStatisticImpl enqueues; + protected CountStatisticImpl dequeues; + protected CountStatisticImpl dispatched; + + + public SubscriptionStatistics() { + this(true); + } + + public SubscriptionStatistics(boolean enabled) { + + consumedCount = new CountStatisticImpl("consumedCount", "The number of messages that have been consumed by the subscription"); + enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the subscription"); + dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the subscription"); + dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the subscription"); + + addStatistic("consumedCount", consumedCount); + addStatistic("enqueues", enqueues); + addStatistic("dispatched", dispatched); + addStatistic("dequeues", dequeues); + + this.setEnabled(enabled); + } + + public CountStatisticImpl getConsumedCount() { + return consumedCount; + } + + public CountStatisticImpl getEnqueues() { + return enqueues; + } + + public CountStatisticImpl getDequeues() { + return dequeues; + } + + public CountStatisticImpl getDispatched() { + return dispatched; + } + + public void reset() { + if (this.isDoReset()) { + super.reset(); + consumedCount.reset(); + enqueues.reset(); + dequeues.reset(); + dispatched.reset(); + } + } + + public void setEnabled(boolean enabled) { + super.setEnabled(enabled); + consumedCount.setEnabled(enabled); + enqueues.setEnabled(enabled); + dispatched.setEnabled(enabled); + dequeues.setEnabled(enabled); + } + + public void setParent(SubscriptionStatistics parent) { + if (parent != null) { + consumedCount.setParent(parent.consumedCount); + enqueues.setParent(parent.enqueues); + dispatched.setParent(parent.dispatched); + dequeues.setParent(parent.dequeues); + } else { + consumedCount.setParent(null); + enqueues.setParent(null); + dispatched.setParent(null); + dequeues.setParent(null); + } + } + +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index fe3d911822c..ba755ee7dad 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -53,8 +53,6 @@ public class TopicSubscription extends AbstractSubscription { protected PendingMessageCursor matched; protected final SystemUsage usageManager; - protected AtomicLong dispatchedCounter = new AtomicLong(); - boolean singleDestination = true; Destination destination; private final Scheduler scheduler; @@ -63,8 +61,6 @@ public class TopicSubscription extends AbstractSubscription { private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy(); private int discarded; private final Object matchedListMutex = new Object(); - private final AtomicLong enqueueCounter = new AtomicLong(0); - private final AtomicLong dequeueCounter = new AtomicLong(0); private final AtomicInteger prefetchExtension = new AtomicInteger(0); private int memoryUsageHighWaterMark = 95; // allow duplicate suppression in a ring network of brokers @@ -106,7 +102,7 @@ public void add(MessageReference node) throws Exception { // Lets use an indirect reference so that we can associate a unique // locator /w the message. node = new IndirectMessageReference(node.getMessage()); - enqueueCounter.incrementAndGet(); + getSubscriptionStatistics().getEnqueues().increment(); synchronized (matchedListMutex) { // if this subscriber is already discarding a message, we don't want to add // any more messages to it as those messages can only be advisories generated in the process, @@ -135,7 +131,7 @@ public void add(MessageReference node) throws Exception { while (matched.isFull()) { if (getContext().getStopping().get()) { LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId()); - enqueueCounter.decrementAndGet(); + getSubscriptionStatistics().getEnqueues().decrement(); return; } if (!warnedAboutWait) { @@ -231,7 +227,7 @@ protected void removeExpiredMessages() throws IOException { node.decrementReferenceCount(); if (broker.isExpired(node)) { matched.remove(); - dispatchedCounter.incrementAndGet(); + getSubscriptionStatistics().getDispatched().increment(); node.decrementReferenceCount(); ((Destination)node.getRegionDestination()).getDestinationStatistics().getExpired().increment(); broker.messageExpired(getContext(), node, this); @@ -253,7 +249,7 @@ public void processMessageDispatchNotification(MessageDispatchNotification mdn) node.decrementReferenceCount(); if (node.getMessageId().equals(mdn.getMessageId())) { matched.remove(); - dispatchedCounter.incrementAndGet(); + getSubscriptionStatistics().getDispatched().increment(); node.decrementReferenceCount(); break; } @@ -275,12 +271,12 @@ public synchronized void acknowledge(final ConnectionContext context, final Mess @Override public void afterCommit() throws Exception { - synchronized (TopicSubscription.this) { + synchronized (TopicSubscription.this) { if (singleDestination && destination != null) { destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); } } - dequeueCounter.addAndGet(ack.getMessageCount()); + getSubscriptionStatistics().getDequeues().add(ack.getMessageCount()); dispatchMatched(); } }); @@ -292,7 +288,7 @@ public void afterCommit() throws Exception { destination.getDestinationStatistics().getForwards().add(ack.getMessageCount()); } } - dequeueCounter.addAndGet(ack.getMessageCount()); + getSubscriptionStatistics().getDequeues().add(ack.getMessageCount()); } while (true) { int currentExtension = prefetchExtension.get(); @@ -314,7 +310,7 @@ public void afterCommit() throws Exception { destination.getDestinationStatistics().getExpired().add(ack.getMessageCount()); destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); } - dequeueCounter.addAndGet(ack.getMessageCount()); + getSubscriptionStatistics().getDequeues().add(ack.getMessageCount()); while (true) { int currentExtension = prefetchExtension.get(); int newExtension = Math.max(0, currentExtension - ack.getMessageCount()); @@ -337,12 +333,12 @@ public Response pullMessage(ConnectionContext context, final MessagePull pull) t // The slave should not deliver pull messages. if (getPrefetchSize() == 0) { - final long currentDispatchedCount = dispatchedCounter.get(); + final long currentDispatchedCount = getSubscriptionStatistics().getDispatched().getCount(); prefetchExtension.set(pull.getQuantity()); dispatchMatched(); // If there was nothing dispatched.. we may need to setup a timeout. - if (currentDispatchedCount == dispatchedCounter.get() || pull.isAlwaysSignalDone()) { + if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) { // immediate timeout used by receiveNoWait() if (pull.getTimeout() == -1) { @@ -371,7 +367,7 @@ public void run() { */ private final void pullTimeout(long currentDispatchedCount, boolean alwaysSendDone) { synchronized (matchedListMutex) { - if (currentDispatchedCount == dispatchedCounter.get() || alwaysSendDone) { + if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || alwaysSendDone) { try { dispatch(null); } catch (Exception e) { @@ -390,7 +386,8 @@ public int getPendingQueueSize() { @Override public int getDispatchedQueueSize() { - return (int)(dispatchedCounter.get() - prefetchExtension.get() - dequeueCounter.get()); + return (int)(getSubscriptionStatistics().getDispatched().getCount() - + prefetchExtension.get() - getSubscriptionStatistics().getDequeues().getCount()); } public int getMaximumPendingMessages() { @@ -399,17 +396,17 @@ public int getMaximumPendingMessages() { @Override public long getDispatchedCounter() { - return dispatchedCounter.get(); + return getSubscriptionStatistics().getDispatched().getCount(); } @Override public long getEnqueueCounter() { - return enqueueCounter.get(); + return getSubscriptionStatistics().getEnqueues().getCount(); } @Override public long getDequeueCounter() { - return dequeueCounter.get(); + return getSubscriptionStatistics().getDequeues().getCount(); } /** @@ -599,7 +596,7 @@ private void dispatch(final MessageReference node) throws IOException { md.setConsumerId(info.getConsumerId()); if (node != null) { md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination()); - dispatchedCounter.incrementAndGet(); + getSubscriptionStatistics().getDispatched().increment(); // Keep track if this subscription is receiving messages from a single destination. if (singleDestination) { if (destination == null) { @@ -667,7 +664,7 @@ private void discard(MessageReference message) { @Override public String toString() { return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered=" - + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded(); + + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded(); } @Override diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java index f69c380fa54..d692d03c9e7 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java @@ -32,6 +32,7 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.region.SubscriptionStatistics; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; @@ -101,7 +102,7 @@ public void testNoDuplicateAfterCacheFullAndAckedWithSmallAuditDepth() throws Ex public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws Exception { final PersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter(); final MessageStore queueMessageStore = - persistenceAdapter.createQueueMessageStore(destination); + persistenceAdapter.createQueueMessageStore(destination); final ConnectionContext contextNotInTx = new ConnectionContext(); final ConsumerInfo consumerInfo = new ConsumerInfo(); final DestinationStatistics destinationStatistics = new DestinationStatistics(); @@ -139,6 +140,8 @@ public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws // pull from store in small windows Subscription subscription = new Subscription() { + private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics(); + @Override public void add(MessageReference node) throws Exception { if (enqueueCounter.get() != node.getMessageId().getProducerSequenceId()) { @@ -358,6 +361,11 @@ public void incrementConsumedCount(){ public void resetConsumedCount(){ } + + @Override + public SubscriptionStatistics getSubscriptionStatistics() { + return subscriptionStatistics; + } }; queue.addSubscription(contextNotInTx, subscription); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java index 69648426142..50c21363e06 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java @@ -173,8 +173,9 @@ private boolean hasSomeLocks(List dispatched) { public class SimpleImmediateDispatchSubscription implements Subscription, LockOwner { + private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics(); List dispatched = - Collections.synchronizedList(new ArrayList()); + Collections.synchronizedList(new ArrayList()); public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception { @@ -370,5 +371,10 @@ public int countBeforeFull() { return 10; } + @Override + public SubscriptionStatistics getSubscriptionStatistics() { + return subscriptionStatistics; + } + } }