Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ public class QueueImpl implements Queue {

// We cache the consumers here since we don't want to include the redistributor

private final AtomicInteger consumersCount = new AtomicInteger();

private final Set<Consumer> consumerSet = new HashSet<>();

private final Map<SimpleString, Consumer> groups = new HashMap<>();
Expand Down Expand Up @@ -807,7 +809,9 @@ public void addConsumer(final Consumer consumer) throws Exception {

consumerList.add(new ConsumerHolder(consumer));

consumerSet.add(consumer);
if (consumerSet.add(consumer)) {
consumersCount.incrementAndGet();
}

if (refCountForConsumers != null) {
refCountForConsumers.increment();
Expand Down Expand Up @@ -837,7 +841,9 @@ public void removeConsumer(final Consumer consumer) {
pos = consumerList.size() - 1;
}

consumerSet.remove(consumer);
if (consumerSet.remove(consumer)) {
consumersCount.decrementAndGet();
}

LinkedList<SimpleString> groupsToRemove = null;

Expand Down Expand Up @@ -924,8 +930,8 @@ protected void finalize() throws Throwable {
}

@Override
public synchronized int getConsumerCount() {
return consumerSet.size();
public int getConsumerCount() {
return consumersCount.get();
}

@Override
Expand Down Expand Up @@ -1011,16 +1017,14 @@ public synchronized MessageReference getReference(final long id1) throws ActiveM

@Override
public long getMessageCount() {
synchronized (this) {
if (pageSubscription != null) {
// messageReferences will have depaged messages which we need to discount from the counter as they are
// counted on the pageSubscription as well
return messageReferences.size() + getScheduledCount() +
deliveringCount.get() +
pageSubscription.getMessageCount();
} else {
return messageReferences.size() + getScheduledCount() + deliveringCount.get();
}
if (pageSubscription != null) {
// messageReferences will have depaged messages which we need to discount from the counter as they are
// counted on the pageSubscription as well
return messageReferences.size() + getScheduledCount() +
deliveringCount.get() +
pageSubscription.getMessageCount();
} else {
return messageReferences.size() + getScheduledCount() + deliveringCount.get();
}
}

Expand Down