Skip to content

Commit

Permalink
This closes #2807
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Aug 21, 2019
2 parents 6fc1133 + 25d0b51 commit 17f8675
Show file tree
Hide file tree
Showing 21 changed files with 523 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.utils.collections;

import java.lang.reflect.Array;
import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.Objects;

Expand All @@ -43,8 +44,15 @@ public class LinkedListImpl<E> implements LinkedList<E> {

private int nextIndex;

private final Comparator<E> comparator;

public LinkedListImpl() {
this(null);
}

public LinkedListImpl(Comparator<E> comparator) {
iters = createIteratorArray(INITIAL_ITERATOR_ARRAY_SIZE);
this.comparator = comparator;
}

@Override
Expand Down Expand Up @@ -84,6 +92,60 @@ public void addTail(E e) {
}
}

public void addSorted(E e) {
if (comparator == null) {
throw new NullPointerException("comparator=null");
}
if (size == 0) {
addHead(e);
} else {
if (comparator.compare(head.next.val(), e) < 0) {
addHead(e);
return;
}

// in our usage, most of the times we will just add to the end
// as the QueueImpl cancellations in AMQP will return the buffer back to the queue, in the order they were consumed.
// There is an exception to that case, when there are more messages on the queue.
// This would be an optimization for our usage.
// avoiding scanning the entire List just to add at the end, so we compare the end first.
if (comparator.compare(tail.val(), e) >= 0) {
addTail(e);
return;
}

Node<E> fetching = head.next;
while (fetching.next != null) {
int compareNext = comparator.compare(fetching.next.val(), e);
if (compareNext <= 0) {
addAfter(fetching, e);
return;
}
fetching = fetching.next;
}

// this shouldn't happen as the tail was compared before iterating
// the only possibilities for this to happen are:
// - there is a bug on the comparator
// - This method is buggy
// - The list wasn't properly synchronized as this list does't support concurrent access
//
// Also I'm not bothering about creating a Logger ID for this, because the only reason for this code to exist
// is because my OCD level is not letting this out.
throw new IllegalStateException("Cannot find a suitable place for your element, There's a mismatch in the comparator or there was concurrent adccess on the queue");
}
}

private void addAfter(Node<E> node, E e) {
Node<E> newNode = Node.with(e);
Node<E> nextNode = node.next;
node.next = newNode;
newNode.prev = node;
newNode.next = nextNode;
nextNode.prev = newNode;
size++;
}

@Override
public E poll() {
Node<E> ret = head.next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public interface PriorityLinkedList<T> {

void addTail(T t, int priority);

void addSorted(T t, int priority);

T poll();

void clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.utils.collections;

import java.lang.reflect.Array;
import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

Expand All @@ -40,10 +41,15 @@ public class PriorityLinkedListImpl<T> implements PriorityLinkedList<T> {
private int lastPriority = -1;

public PriorityLinkedListImpl(final int priorities) {
this(priorities, null);
}


public PriorityLinkedListImpl(final int priorities, Comparator<T> comparator) {
levels = (LinkedListImpl<T>[]) Array.newInstance(LinkedListImpl.class, priorities);

for (int i = 0; i < priorities; i++) {
levels[i] = new LinkedListImpl<>();
levels[i] = new LinkedListImpl<>(comparator);
}
}

Expand Down Expand Up @@ -80,6 +86,15 @@ public void addTail(final T t, final int priority) {
exclusiveIncrementSize(1);
}

@Override
public void addSorted(T t, int priority) {
checkHighest(priority);

levels[priority].addSorted(t);

exclusiveIncrementSize(1);
}

@Override
public T poll() {
T t = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public AddressQueryResult addressQuery(SimpleString addressName,

public void closeSender(final Object brokerConsumer) throws Exception {
final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
consumer.close(false);
consumer.close(false, true);
consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
}

Expand Down Expand Up @@ -405,7 +405,7 @@ public void ack(Transaction transaction, Object brokerConsumer, Message message)
public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
OperationContext oldContext = recoverContext();
try {
((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts, true);
((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
} finally {
resetContext(oldContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ProtonTransactionImpl extends TransactionImpl {
private boolean discharged;

public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) {
super(xid, storageManager, timeoutSeconds);
super(xid, storageManager, timeoutSeconds, true);
addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int de
sendServerMessage(mqttid, message, deliveryCount, qos);
} else {
// Client must have disconnected and it's Subscription QoS cleared
consumer.individualCancel(message.getMessageID(), false);
consumer.individualCancel(message.getMessageID(), false, true);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ public interface Queue extends Bindable,CriticalComponent {

ReferenceCounter getConsumersRefCount();

/* Called when a message is cancelled back into the queue */
void addSorted(List<MessageReference> refs, boolean scheduling);

void reload(MessageReference ref);

void addTail(MessageReference ref);
Expand All @@ -154,6 +157,9 @@ public interface Queue extends Bindable,CriticalComponent {

void addHead(MessageReference ref, boolean scheduling);

/* Called when a message is cancelled back into the queue */
void addSorted(MessageReference ref, boolean scheduling);

void addHead(List<MessageReference> refs, boolean scheduling);

void acknowledge(MessageReference ref) throws Exception;
Expand All @@ -172,7 +178,8 @@ public interface Queue extends Bindable,CriticalComponent {

void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck);

void cancel(MessageReference reference, long timeBase) throws Exception;
/** @param sorted it should use the messageID as a reference to where to add it in the queue */
void cancel(MessageReference reference, long timeBase, boolean sorted) throws Exception;

void deliverAsync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {

void close(boolean failed) throws Exception;

void close(boolean failed, boolean sorted) throws Exception;

/**
* This method is just to remove itself from Queues.
* If for any reason during a close an exception occurred, the exception treatment
Expand Down Expand Up @@ -98,7 +100,7 @@ List<MessageReference> getDeliveringReferencesBasedOnProtocol(boolean remove,

void reject(long messageID) throws Exception;

void individualCancel(long messageID, boolean failed) throws Exception;
void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception;

void forceDelivery(long sequence);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ private void cancelRefs() {
refqueue = ref.getQueue();

try {
refqueue.cancel(ref, timeBase);
refqueue.cancel(ref, timeBase, false);
} catch (Exception e) {
// There isn't much we can do besides log an error
ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server.impl;

import java.util.Comparator;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;

Expand All @@ -33,6 +34,28 @@
*/
public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {

private static final MessageReferenceComparatorByID idComparator = new MessageReferenceComparatorByID();

public static Comparator<MessageReference> getIDComparator() {
return idComparator;
}

private static class MessageReferenceComparatorByID implements Comparator<MessageReference> {

@Override
public int compare(MessageReference o1, MessageReference o2) {
long value = o2.getMessage().getMessageID() - o1.getMessage().getMessageID();
if (value > 0) {
return 1;
} else if (value < 0) {
return -1;
} else {
return 0;
}
}
}


private static final AtomicIntegerFieldUpdater<MessageReferenceImpl> DELIVERY_COUNT_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(MessageReferenceImpl.class, "deliveryCount");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final MpscUnboundedArrayQueue<MessageReference> intermediateMessageReferences = new MpscUnboundedArrayQueue<>(8192);

// This is where messages are stored
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES);
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator());

// The quantity of pagedReferences on messageReferences priority list
private final AtomicInteger pagedReferences = new AtomicInteger(0);
Expand Down Expand Up @@ -894,6 +894,25 @@ public void addHead(final MessageReference ref, boolean scheduling) {
}
}

/* Called when a message is cancelled back into the queue */
@Override
public void addSorted(final MessageReference ref, boolean scheduling) {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
try {
if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
return;
}

internalAddSorted(ref);

directDeliver = false;
} finally {
leaveCritical(CRITICAL_PATH_ADD_HEAD);
}
}
}

/* Called when a message is cancelled back into the queue */
@Override
public void addHead(final List<MessageReference> refs, boolean scheduling) {
Expand All @@ -913,6 +932,25 @@ public void addHead(final List<MessageReference> refs, boolean scheduling) {
}
}

/* Called when a message is cancelled back into the queue */
@Override
public void addSorted(final List<MessageReference> refs, boolean scheduling) {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
try {
for (MessageReference ref : refs) {
addSorted(ref, scheduling);
}

resetAllIterators();

deliverAsync();
} finally {
leaveCritical(CRITICAL_PATH_ADD_HEAD);
}
}
}

@Override
public synchronized void reload(final MessageReference ref) {
queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
Expand Down Expand Up @@ -1631,11 +1669,15 @@ public void cancel(final Transaction tx, final MessageReference reference, boole
}

@Override
public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception {
public synchronized void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception {
Pair<Boolean, Boolean> redeliveryResult = checkRedelivery(reference, timeBase, false);
if (redeliveryResult.getA()) {
if (!scheduledDeliveryHandler.checkAndSchedule(reference, false)) {
internalAddHead(reference);
if (sorted) {
internalAddSorted(reference);
} else {
internalAddHead(reference);
}
}

resetAllIterators();
Expand Down Expand Up @@ -2469,6 +2511,23 @@ private void internalAddHead(final MessageReference ref) {
messageReferences.addHead(ref, priority);
}

/**
* The caller of this method requires synchronized on the queue.
* I'm not going to add synchronized to this method just for a precaution,
* as I'm not 100% sure this won't cause any extra runtime.
*
* @param ref
*/
private void internalAddSorted(final MessageReference ref) {
queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
pendingMetrics.incrementMetrics(ref);
refAdded(ref);

int priority = getPriority(ref);

messageReferences.addSorted(ref, priority);
}

private int getPriority(MessageReference ref) {
try {
return ref.getMessage().getPriority();
Expand Down Expand Up @@ -3440,13 +3499,21 @@ public void postAcknowledge(final MessageReference ref, AckReason reason) {
}

void postRollback(final LinkedList<MessageReference> refs) {
postRollback(refs, false);
}

void postRollback(final LinkedList<MessageReference> refs, boolean sorted) {
//if we have purged then ignore adding the messages back
if (purgeOnNoConsumers && getConsumerCount() == 0) {
purgeAfterRollback(refs);

return;
}
addHead(refs, false);
if (sorted) {
addSorted(refs, false);
} else {
addHead(refs, false);
}
}

private void purgeAfterRollback(LinkedList<MessageReference> refs) {
Expand Down

0 comments on commit 17f8675

Please sign in to comment.