Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-856 Test fixes #2209

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ public String createQueue(String address,
boolean purgeOnNoConsumers,
boolean autoCreateAddress) throws Exception {
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(address == null ? name : address);
return createQueue(address, routingType, name, filterStr, durable, addressSettings.getDefaultMaxConsumers(), addressSettings.isDefaultPurgeOnNoConsumers(), addressSettings.isDefaultExclusiveQueue(), addressSettings.isDefaultLastValueQueue(), addressSettings.getDefaultConsumersBeforeDispatch(), addressSettings.getDefaultDelayBeforeDispatch(), addressSettings.isAutoCreateAddresses());
return createQueue(address, routingType, name, filterStr, durable, maxConsumers, purgeOnNoConsumers, addressSettings.isDefaultExclusiveQueue(), addressSettings.isDefaultLastValueQueue(), addressSettings.getDefaultConsumersBeforeDispatch(), addressSettings.getDefaultDelayBeforeDispatch(), autoCreateAddress);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1766,7 +1766,7 @@ public void createSharedQueue(final SimpleString address,
boolean exclusive,
boolean lastValue) throws Exception {
AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? name.toString() : address.toString());
createSharedQueue(address, routingType, name, filterString, user, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch());
createSharedQueue(address, routingType, name, filterString, user, durable, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {

private final List<ConsumerHolder> consumerList = new CopyOnWriteArrayList<>();

private int getConsumerListSize() {
return consumerList.size() + (redistributor != null ? 1 : 0);
}

private ConsumerHolder getConsumerList(int i) {
if (i == consumerList.size()) {
return redistributor;
} else {
return consumerList.get(i);
}
}

private final ScheduledDeliveryHandler scheduledDeliveryHandler;

private AtomicLong messagesAdded = new AtomicLong(0);
Expand Down Expand Up @@ -321,6 +333,10 @@ public String debug() {
out.println("consumer: " + holder.consumer.debug());
}

if (redistributor != null) {
out.println("Redistributor::" + redistributor);
}

for (MessageReference reference : intermediateMessageReferences) {
out.print("Intermediate reference:" + reference);
}
Expand Down Expand Up @@ -544,6 +560,7 @@ public boolean isExclusive() {

@Override
public synchronized void setExclusive(boolean exclusive) {
new Exception("exclusive set at " + exclusive).printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this needs to be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. That’s debug I forgot :p

this.exclusive = exclusive;
}

Expand Down Expand Up @@ -1077,7 +1094,8 @@ public void removeConsumer(final Consumer consumer) {

private boolean checkConsumerDirectDeliver() {
boolean supports = true;
for (ConsumerHolder consumerCheck : consumerList) {
for (int i = 0; i < getConsumerListSize(); i++) {
ConsumerHolder consumerCheck = getConsumerList(i);
if (!consumerCheck.consumer.supportsDirectDelivery()) {
supports = false;
}
Expand Down Expand Up @@ -2157,7 +2175,8 @@ public synchronized int changeReferencesPriority(final Filter filter, final byte

@Override
public synchronized void resetAllIterators() {
for (ConsumerHolder holder : this.consumerList) {
for (int i = 0; i < getConsumerListSize(); i++) {
ConsumerHolder holder = getConsumerList(i);
if (holder.iter != null) {
holder.iter.close();
}
Expand Down Expand Up @@ -2382,24 +2401,23 @@ private void deliver() {
}

ConsumerHolder<? extends Consumer> holder;
if (redistributor == null) {
if (endPos < 0 || consumersChanged) {
consumersChanged = false;

if (endPos < 0 || consumersChanged) {
consumersChanged = false;
size = getConsumerListSize();

size = consumerList.size();
endPos = pos - 1;

endPos = pos - 1;

if (endPos < 0) {
endPos = size - 1;
noDelivery = 0;
}
if (endPos < 0) {
endPos = size - 1;
noDelivery = 0;
}
}

holder = consumerList.get(pos);
} else {
if (!canDispatch() && redistributor != null) {
holder = redistributor;
} else {
holder = getConsumerList(exclusive ? 0 : pos);
}

Consumer consumer = holder.consumer;
Expand Down Expand Up @@ -2446,10 +2464,6 @@ private void deliver() {
}
}

if (exclusive && redistributor == null) {
consumer = consumerList.get(0).consumer;
}

HandleStatus status = handle(ref, consumer);

if (status == HandleStatus.HANDLED) {
Expand Down Expand Up @@ -2987,13 +3001,13 @@ private boolean deliverDirect(final MessageReference ref) {

int startPos = pos;

int size = consumerList.size();
int size = getConsumerListSize();

while (true) {
ConsumerHolder<? extends Consumer> holder;
if (redistributor == null) {
holder = consumerList.get(pos);
} else {

ConsumerHolder holder = getConsumerList(exclusive ? 0 : pos);
if (!canDispatch() && redistributor != null) {
// if you can't dispatch, the only possible one is the redistributor
holder = redistributor;
}

Expand All @@ -3013,10 +3027,6 @@ private boolean deliverDirect(final MessageReference ref) {
}
}

if (exclusive && redistributor == null) {
consumer = consumerList.get(0).consumer;
}

// Only move onto the next position if the consumer on the current position was used.
if (!exclusive && groupConsumer == null) {
pos++;
Expand Down Expand Up @@ -3123,6 +3133,9 @@ private List<ConsumerHolder> cloneConsumersList() {

synchronized (this) {
consumerListClone = new ArrayList<>(consumerList);
if (redistributor != null) {
consumerListClone.add(redistributor);
}
}
return consumerListClone;
}
Expand Down