From f0dd99cb1ac0e2bb220ca45f82623e53b9025d8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Sat, 5 Jan 2019 09:48:24 +0000 Subject: [PATCH] Extract --- .../activemq/artemis/core}/PriorityAware.java | 8 +- .../collections/ArrayResettableIterator.java | 58 ++ .../utils/collections/MultiIterator.java | 73 +++ .../collections/MultiResettableIterator.java | 38 ++ .../utils/collections/PriorityCollection.java | 328 ++++++++++ .../utils/collections/ResettableIterator.java | 8 +- .../utils/collections/SingletonIterator.java | 61 ++ .../artemis/core/server/Consumer.java | 6 + .../core/server/impl/QueueConsumers.java | 13 +- .../core/server/impl/QueueConsumersImpl.java | 591 ++---------------- .../artemis/core/server/impl/QueueImpl.java | 40 +- .../server/impl/QueueConsumersImplTest.java | 2 +- 12 files changed, 636 insertions(+), 590 deletions(-) rename {artemis-server/src/main/java/org/apache/activemq/artemis/core/server => artemis-commons/src/main/java/org/apache/activemq/artemis/core}/PriorityAware.java (78%) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ArrayResettableIterator.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java rename artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java => artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java (82%) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/SingletonIterator.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/PriorityAware.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/PriorityAware.java similarity index 78% rename from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/PriorityAware.java rename to artemis-commons/src/main/java/org/apache/activemq/artemis/core/PriorityAware.java index c719d8472626..e90015a1c1ec 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/PriorityAware.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/PriorityAware.java @@ -14,13 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.core.server; - -import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +package org.apache.activemq.artemis.core; public interface PriorityAware { - default int getPriority() { - return ActiveMQDefaultConfiguration.getDefaultConsumerPriority(); - } + int getPriority(); } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ArrayResettableIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ArrayResettableIterator.java new file mode 100644 index 000000000000..680489a7d3d4 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ArrayResettableIterator.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +public class ArrayResettableIterator implements ResettableIterator { + + private final Object[] array; + private int cursor = 0; + private int endPos = -1; + private boolean hasNext; + + public ArrayResettableIterator(Object[] array) { + this.array = array; + reset(); + } + + @Override + public ResettableIterator reset() { + endPos = cursor; + hasNext = array.length > 0; + return this; + } + + @Override + public boolean hasNext() { + return hasNext; + } + + @Override + public T next() { + if (!hasNext) { + throw new IllegalStateException(); + } + @SuppressWarnings("unchecked") T result = (T) array[cursor]; + cursor++; + if (cursor == array.length) { + cursor = 0; + } + if (cursor == endPos) { + hasNext = false; + } + return result; + } +} \ No newline at end of file diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java new file mode 100644 index 000000000000..8a7419b98548 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Iterator; + +public class MultiIterator implements Iterator { + + private final Iterator[] iterators; + int index = -1; + + public MultiIterator(Iterator[] iterators) { + this.iterators = iterators; + } + + @Override + public boolean hasNext() { + while (true) { + if (index != -1) { + Iterator currentIterator = get(index); + if (currentIterator.hasNext()) { + return true; + } + } + int next = index + 1; + if (next < iterators.length) { + moveTo(next); + } else { + return false; + } + } + } + + @Override + public T next() { + while (true) { + if (index != -1) { + Iterator currentIterator = get(index); + if (currentIterator.hasNext()) { + return currentIterator.next(); + } + } + int next = index + 1; + if (next < iterators.length) { + moveTo(next); + } else { + return null; + } + } + } + + protected void moveTo(int index) { + this.index = index; + } + + protected Iterator get(int index) { + return iterators[index]; + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java new file mode 100644 index 000000000000..d51d9b43c054 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +public class MultiResettableIterator extends MultiIterator implements ResettableIterator { + + public MultiResettableIterator(ResettableIterator[] iterators) { + super(iterators); + } + + @Override + protected void moveTo(int index) { + super.moveTo(index); + if (index > -1) { + ((ResettableIterator) get(index)).reset(); + } + } + + @Override + public ResettableIterator reset() { + moveTo(-1); + return this; + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java new file mode 100644 index 000000000000..41147e9ad9ee --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import org.apache.activemq.artemis.core.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking. + * + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure, + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here. + * + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock. + * + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations. + * + * There can only be one resettable iterable view, this is exposed at the top getPriority, + * and is intended for use in QueueImpl only. + * All other iterators are not reset-able and are created on calling iterator(). + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class PriorityCollection extends AbstractCollection { + + private final Supplier> supplier; + private volatile PriorityHolder[] priorityHolders = newPrioritySetArrayInstance(0); + private volatile int size; + + private void setArray(PriorityHolder[] priorityHolders) { + this.priorityHolders = priorityHolders; + } + + private PriorityHolder[] getArray() { + return priorityHolders; + } + + + public PriorityCollection(Supplier> supplier) { + this.supplier = supplier; + } + + @SuppressWarnings("unchecked") + private static PriorityHolder[] newPrioritySetArrayInstance(int length) { + return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + public Set getPriorites() { + PriorityHolder[] priorityHolders = getArray(); + return Arrays.stream(priorityHolders).map(PriorityAware::getPriority).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + Iterator[] iterators = getIterators(); + return new MultiIterator<>(iterators); + } + + private Iterator[] getIterators() { + PriorityHolder[] priorityHolders = this.getArray(); + int size = priorityHolders.length; + Iterator[] iterators = newIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = priorityHolders[i].getValues().iterator(); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static Iterator[] newIteratorArrayInstance(int length) { + return (Iterator[]) Array.newInstance(Iterator.class, length); + } + + public ResettableIterator resettableIterator() { + return new MultiResettableIterator(getResettableIterators()); + } + + private ResettableIterator[] getResettableIterators() { + PriorityHolder[] priorityHolders = this.getArray(); + int size = priorityHolders.length; + ResettableIterator[] iterators = newResettableIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = new ArrayResettableIterator<>(priorityHolders[i].getValues().toArray()); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static ResettableIterator[] newResettableIteratorArrayInstance(int length) { + return (ResettableIterator[]) Array.newInstance(ResettableIterator.class, length); + } + + @Override + public void forEach(Consumer action) { + Objects.requireNonNull(action); + PriorityHolder[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].getValues().forEach(action); + } + } + + private Collection getCollection(int priority, boolean createIfMissing) { + PriorityHolder[] current = getArray(); + int low = 0; + int high = current.length - 1; + + while (low <= high) { + int mid = (low + high) >>> 1; + PriorityHolder midVal = current[mid]; + + if (midVal.getPriority() > priority) + low = mid + 1; + else if (midVal.getPriority() < priority) + high = mid - 1; + else + return midVal.getValues(); //key found + } + + if (createIfMissing) { + PriorityHolder[] newLevels = newPrioritySetArrayInstance(current.length + 1); + if (low > 0) { + System.arraycopy(current, 0, newLevels, 0, low); + } + if (current.length - low > 0) { + System.arraycopy(current, low, newLevels, low + 1, current.length - low); + } + newLevels[low] = new PriorityHolder(priority, supplier); + setArray(newLevels); + return newLevels[low].getValues(); + } + return null; + } + + @Override + public synchronized boolean add(T t) { + boolean result = addInternal(t); + calcSize(); + return result; + } + + private boolean addInternal(T t) { + if (t == null) return false; + Collection priority = getCollection(t.getPriority(), true); + return priority.add(t); + } + + @Override + public boolean remove(Object o) { + return o instanceof PriorityAware && remove((PriorityAware) o); + } + + public synchronized boolean remove(PriorityAware priorityAware) { + boolean result = removeInternal(priorityAware); + calcSize(); + return result; + } + + private boolean removeInternal(PriorityAware priorityAware) { + if ( priorityAware == null) return false; + Collection priority = getCollection(priorityAware.getPriority(), false); + boolean result = priority != null && priority.remove(priorityAware); + if (priority != null && priority.size() == 0) { + removeCollection(priorityAware.getPriority()); + } + return result; + } + + private Collection removeCollection(int priority) { + PriorityHolder[] current = getArray(); + int len = current.length; + int low = 0; + int high = len - 1; + + while (low <= high) { + int mid = (low + high) >>> 1; + PriorityHolder midVal = current[mid]; + + if (midVal.getPriority() > priority) + low = mid + 1; + else if (midVal.getPriority() < priority) + high = mid - 1; + else { + PriorityHolder[] newLevels = newPrioritySetArrayInstance(len - 1); + System.arraycopy(current, 0, newLevels, 0, mid); + System.arraycopy(current, mid + 1, newLevels, mid, len - mid - 1); + setArray(newLevels); + return midVal.getValues(); //key found + } + } + return null; + } + + @Override + public boolean containsAll(Collection c) { + Objects.requireNonNull(c); + for (Object e : c) + if (!contains(e)) + return false; + return true; + } + + @Override + public synchronized boolean addAll(Collection c) { + Objects.requireNonNull(c); + boolean modified = false; + for (T e : c) + if (addInternal(e)) + modified = true; + calcSize(); + return modified; + } + + @Override + public synchronized boolean removeAll(Collection c) { + Objects.requireNonNull(c); + boolean modified = false; + for (Object o : c) { + if (remove(o)) { + modified = true; + } + } + calcSize(); + return modified; + } + + @Override + public synchronized boolean retainAll(Collection c) { + Objects.requireNonNull(c); + boolean modified = false; + PriorityHolder[] levels = getArray(); + for (PriorityHolder level : levels) { + if (level.getValues().retainAll(c)) { + modified = true; + } + } + calcSize(); + return modified; + } + + @Override + public synchronized void clear() { + PriorityHolder[] levels = getArray(); + for (PriorityHolder level : levels) { + level.getValues().clear(); + } + calcSize(); + } + + @Override + public boolean contains(Object o) { + return o instanceof PriorityAware && contains((PriorityAware) o); + } + + public boolean contains(PriorityAware priorityAware) { + if (priorityAware == null) return false; + Collection prioritySet = getCollection(priorityAware.getPriority(), false); + return prioritySet != null && prioritySet.contains(priorityAware); + } + + private void calcSize() { + PriorityHolder[] current = getArray(); + int size = 0; + for (PriorityHolder level : current) { + size += level.getValues().size(); + } + this.size = size; + } + + public static class PriorityHolder implements PriorityAware { + + private final int priority; + + private final Collection values; + + public PriorityHolder(int priority, Supplier> supplier) { + this.priority = priority; + this.values = supplier.get(); + } + + @Override + public int getPriority() { + return priority; + } + + public Collection getValues() { + return values; + } + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java similarity index 82% rename from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java rename to artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java index b221235fc7a8..c4e56cf3d5cb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -14,16 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.core.server.impl; +package org.apache.activemq.artemis.utils.collections; import java.util.Iterator; -public interface ResetableIterator extends Iterator { +public interface ResettableIterator extends Iterator { /** * Resets the iterator so you can re-iterate over all elements. * * @return itself, this is just for convenience. */ - ResetableIterator reset(); + ResettableIterator reset(); } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/SingletonIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/SingletonIterator.java new file mode 100644 index 000000000000..956e045e2304 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/SingletonIterator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.function.Consumer; + +public class SingletonIterator implements Iterator { + + private E value; + + public static Iterator newInstance(E e) { + return new SingletonIterator<>(e); + } + + private SingletonIterator(E value) { + this.value = value; + } + + @Override + public boolean hasNext() { + return value != null; + } + + @Override + public E next() { + if (value != null) { + E result = value; + value = null; + return result; + } else { + throw new NoSuchElementException(); + } + } + + @Override + public void remove() { + value = null; + } + + @Override + public void forEachRemaining(Consumer action) { + if (value != null) + action.accept(value); + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java index 04df32194e8e..ea8f1040590a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java @@ -18,6 +18,8 @@ import java.util.List; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.core.PriorityAware; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; @@ -81,4 +83,8 @@ default boolean supportsDirectDelivery() { /** an unique sequential ID for this consumer */ long sequentialID(); + @Override + default int getPriority() { + return ActiveMQDefaultConfiguration.getDefaultConsumerPriority(); + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java index 5c755149e27c..f36e7c93406f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java @@ -16,12 +16,11 @@ */ package org.apache.activemq.artemis.core.server.impl; -import org.apache.activemq.artemis.core.server.PriorityAware; +import org.apache.activemq.artemis.core.PriorityAware; -import java.util.Collection; import java.util.Set; -public interface QueueConsumers extends Collection { +public interface QueueConsumers extends Iterable { Set getPriorites(); @@ -31,4 +30,12 @@ public interface QueueConsumers extends Collection { QueueConsumers reset(); + boolean add(T t); + + boolean remove(T t); + + int size(); + + boolean isEmpty(); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java index 9dbe089d1583..ee75fd9d22c4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java @@ -16,17 +16,18 @@ */ package org.apache.activemq.artemis.core.server.impl; -import org.apache.activemq.artemis.core.server.PriorityAware; +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; -import java.lang.reflect.Array; -import java.util.AbstractCollection; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; -import java.util.Objects; import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; -import java.util.stream.Collectors; /** * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of @@ -48,592 +49,80 @@ * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, * but intent is this is the QueueImpl:ConsumerHolder. */ -public class QueueConsumersImpl extends AbstractCollection implements QueueConsumers { +public class QueueConsumersImpl implements QueueConsumers { - private final QueueConsumersIterator iterator = new QueueConsumersIterator<>(this, true); - - private volatile Level[] levels; - private volatile int size; - private volatile T first; - - private void setArray(Level[] array) { - this.levels = array; - } - - private Level[] getArray() { - return levels; - } - - - public QueueConsumersImpl() { - levels = newLevelArrayInstance(0); - } - - @SuppressWarnings("unchecked") - private static Level[] newLevelArrayInstance(int length) { - return (Level[]) Array.newInstance(Level.class, length); - } - - @Override - public int size() { - return size; - } - - @Override - public boolean isEmpty() { - return size() == 0; - } + private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator"); + private volatile ResettableIterator changedIterator; + private ResettableIterator currentIterator = consumers.resettableIterator(); @Override public Set getPriorites() { - Level[] levels = getArray(); - return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet()); - } - - @Override - public Iterator iterator() { - return new QueueConsumersIterator<>(this, false); + return consumers.getPriorites(); } @Override public boolean hasNext() { - return iterator.hasNext(); + return currentIterator.hasNext(); } @Override public T next() { - return iterator.next(); + return currentIterator.next(); } @Override public QueueConsumers reset() { - iterator.reset(); + if (changedIterator != null) { + currentIterator = changedIterator; + changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null); + } else { + currentIterator.reset(); + } return this; } @Override - public void forEach(Consumer action) { - Objects.requireNonNull(action); - Level[] current = getArray(); - int len = current.length; - for (int i = 0; i < len; ++i) { - current[i].forEach(action); - } - } - - private Level getLevel(int level, boolean createIfMissing) { - Level[] current = getArray(); - int low = 0; - int high = current.length - 1; - - while (low <= high) { - int mid = (low + high) >>> 1; - Level midVal = current[mid]; - - if (midVal.level() > level) - low = mid + 1; - else if (midVal.level() < level) - high = mid - 1; - else - return midVal; //key found - } - - if (createIfMissing) { - Level[] newLevels = newLevelArrayInstance(current.length + 1); - if (low > 0) { - System.arraycopy(current, 0, newLevels, 0, low); - } - if (current.length - low > 0) { - System.arraycopy(current, low, newLevels, low + 1, current.length - low); - } - newLevels[low] = new Level(level); - setArray(newLevels); - return newLevels[low]; + public boolean add(T t) { + boolean result = consumers.add(t); + if(result) { + changedIteratorFieldUpdater.set(this, consumers.resettableIterator()); } - return null; - } - - @Override - public synchronized boolean add(T t) { - boolean result = addInternal(t); - calcSize(); return result; } - private boolean addInternal(T t) { - if (t == null) return false; - Level level = getLevel(t.getPriority(), true); - return level.add(t); - } - @Override - public boolean remove(Object o) { - return o instanceof PriorityAware && remove((PriorityAware) o); - } - - public synchronized boolean remove(PriorityAware priorityAware) { - boolean result = removeInternal(priorityAware); - calcSize(); - return result; - } - - private boolean removeInternal(PriorityAware priorityAware) { - if ( priorityAware == null) return false; - Level level = getLevel(priorityAware.getPriority(), false); - boolean result = level != null && level.remove(priorityAware); - if (level != null && level.size() == 0) { - removeLevel(level.level); + public boolean remove(T t) { + boolean result = consumers.remove(t); + if(result) { + changedIteratorFieldUpdater.set(this, consumers.resettableIterator()); } return result; } - private Level removeLevel(int level) { - Level[] current = getArray(); - int len = current.length; - int low = 0; - int high = len - 1; - - while (low <= high) { - int mid = (low + high) >>> 1; - Level midVal = current[mid]; - - if (midVal.level() > level) - low = mid + 1; - else if (midVal.level() < level) - high = mid - 1; - else { - Level[] newLevels = newLevelArrayInstance(len - 1); - System.arraycopy(current, 0, newLevels, 0, mid); - System.arraycopy(current, mid + 1, newLevels, mid, len - mid - 1); - setArray(newLevels); - return midVal; //key found - } - } - return null; - } - @Override - public boolean containsAll(Collection c) { - Objects.requireNonNull(c); - for (Object e : c) - if (!contains(e)) - return false; - return true; - } - - @Override - public synchronized boolean addAll(Collection c) { - Objects.requireNonNull(c); - boolean modified = false; - for (T e : c) - if (addInternal(e)) - modified = true; - calcSize(); - return modified; + public int size() { + return consumers.size(); } @Override - public synchronized boolean removeAll(Collection c) { - Objects.requireNonNull(c); - boolean modified = false; - for (Object o : c) { - if (remove(o)) { - modified = true; - } - } - calcSize(); - return modified; + public boolean isEmpty() { + return consumers.isEmpty(); } @Override - public synchronized boolean retainAll(Collection c) { - Objects.requireNonNull(c); - boolean modified = false; - Level[] levels = getArray(); - for (Level level : levels) { - if (level.retainAll(c)) { - modified = true; - } - } - calcSize(); - return modified; + public Iterator iterator() { + return unmodifiableConsumers.iterator(); } @Override - public synchronized void clear() { - Level[] levels = getArray(); - for (Level level : levels) { - level.clear(); - } - calcSize(); + public void forEach(Consumer action) { + unmodifiableConsumers.forEach(action); } - - @Override - public boolean contains(Object o) { - return o instanceof PriorityAware && contains((PriorityAware) o); - } - - public boolean contains(PriorityAware priorityAware) { - if (priorityAware == null) return false; - Level level = getLevel(priorityAware.getPriority(), false); - return level != null && level.contains(priorityAware); - } - - private void calcSize() { - Level[] current = getArray(); - int size = 0; - for (Level level : current) { - size += level.size(); - } - this.size = size; - } - - private static class QueueConsumersIterator implements ResetableIterator { - - private final QueueConsumersImpl queueConsumers; - private final boolean resetable; - private Level[] levels; - int level = -1; - private ResetableIterator currentIterator; - - private QueueConsumersIterator(QueueConsumersImpl queueConsumers, boolean resetable) { - this.queueConsumers = queueConsumers; - this.levels = queueConsumers.getArray(); - this.resetable = resetable; - - } - - @Override - public boolean hasNext() { - while (true) { - if (currentIterator != null) { - if (currentIterator.hasNext()) { - return true; - } - } - int nextLevel = level + 1; - if (levels != null && nextLevel < levels.length) { - moveToLevel(nextLevel); - } else { - return false; - } - } - } - - @Override - public T next() { - while (true) { - if (currentIterator != null) { - if (currentIterator.hasNext()) { - return currentIterator.next(); - } - } - int nextLevel = level + 1; - if (levels != null && nextLevel < levels.length) { - moveToLevel(nextLevel); - } else { - return null; - } - } - } - - private void moveToLevel(int level) { - Level level0 = levels[level]; - if (resetable) { - currentIterator = level0.resetableIterator().reset(); - } else { - currentIterator = level0.iterator(); - } - this.level = level; - } - - @Override - public ResetableIterator reset() { - if (!resetable) { - throw new IllegalStateException("Iterator is not resetable"); - } - levels = queueConsumers.getArray(); - level = -1; - currentIterator = null; - return this; - } - } - - /** - * This is represents a getPriority and is modeled on {@link java.util.concurrent.CopyOnWriteArrayList}. - * - * @param - */ - private static class Level { - - /** The array, accessed only via getArray/setArray. */ - private transient volatile Object[] array; - - private transient volatile ResetableIterator resetableIterator; - - private final int level; - - /** - * Gets the array. Non-private so as to also be accessible - * from CopyOnWriteArraySet class. - */ - private Object[] getArray() { - return array; - } - - /** - * Sets the array. - */ - private void setArray(Object[] a) { - array = a; - resetableIterator = new LevelResetableIterator<>(a); - } - - /** - * Creates an empty list. - */ - private Level(int level) { - setArray(new Object[0]); - this.level = level; - } - - public int level() { - return level; - } - - public void forEach(Consumer action) { - if (action == null) throw new NullPointerException(); - Object[] elements = getArray(); - for (Object element : elements) { - @SuppressWarnings("unchecked") E e = (E) element; - action.accept(e); - } - } - - /** - * Returns the number of elements in this list. - * - * @return the number of elements in this list - */ - public int size() { - return getArray().length; - } - - /** - * Returns {@code true} if this list contains no elements. - * - * @return {@code true} if this list contains no elements - */ - public boolean isEmpty() { - return size() == 0; - } - - /** - * Returns {@code true} if this list contains the specified element. - * More formally, returns {@code true} if and only if this list contains - * at least one element {@code e} such that - * (o==null ? e==null : o.equals(e)). - * - * @param o element whose presence in this list is to be tested - * @return {@code true} if this list contains the specified element - */ - public boolean contains(Object o) { - Object[] elements = getArray(); - return indexOf(o, elements, 0, elements.length) >= 0; - } - - /** - * Tests for equality, coping with nulls. - */ - private static boolean eq(Object o1, Object o2) { - return (o1 == null) ? o2 == null : o1.equals(o2); - } - - /** - * static version of indexOf, to allow repeated calls without - * needing to re-acquire array each time. - * @param o element to search for - * @param elements the array - * @param index first index to search - * @param fence one past last index to search - * @return index of element, or -1 if absent - */ - private static int indexOf(Object o, Object[] elements, - int index, int fence) { - if (o == null) { - for (int i = index; i < fence; i++) - if (elements[i] == null) - return i; - } else { - for (int i = index; i < fence; i++) - if (o.equals(elements[i])) - return i; - } - return -1; - } - - /** - * Appends the specified element to the end of this list. - * - * @param e element to be appended to this list - * @return {@code true} (as specified by {@link Collection#add}) - */ - public boolean add(E e) { - Object[] elements = getArray(); - int len = elements.length; - Object[] newElements = Arrays.copyOf(elements, len + 1); - newElements[len] = e; - setArray(newElements); - return true; - } - - /** - * Removes the first occurrence of the specified element from this list, - * if it is present. If this list does not contain the element, it is - * unchanged. More formally, removes the element with the lowest index - * {@code i} such that - * (o==null ? get(i)==null : o.equals(get(i))) - * (if such an element exists). Returns {@code true} if this list - * contained the specified element (or equivalently, if this list - * changed as a result of the call). - * - * @param o element to be removed from this list, if present - * @return {@code true} if this list contained the specified element - */ - public boolean remove(Object o) { - Object[] snapshot = getArray(); - int index = indexOf(o, snapshot, 0, snapshot.length); - return (index >= 0) && remove(o, snapshot, index); - } - - /** - * A version of remove(Object) using the strong hint that given - * recent snapshot contains o at the given index. - */ - private boolean remove(Object o, Object[] snapshot, int index) { - Object[] current = getArray(); - int len = current.length; - if (snapshot != current) - findIndex: { - int prefix = Math.min(index, len); - for (int i = 0; i < prefix; i++) { - if (current[i] != snapshot[i] && eq(o, current[i])) { - index = i; - break findIndex; - } - } - if (index >= len) - return false; - if (current[index] == o) - break findIndex; - index = indexOf(o, current, index, len); - if (index < 0) - return false; - } - Object[] newElements = new Object[len - 1]; - System.arraycopy(current, 0, newElements, 0, index); - System.arraycopy(current, index + 1, - newElements, index, - len - index - 1); - setArray(newElements); - return true; - } - - /** - * Retains only the elements in this list that are contained in the - * specified collection. In other words, removes from this list all of - * its elements that are not contained in the specified collection. - * - * @param c collection containing elements to be retained in this list - * @return {@code true} if this list changed as a result of the call - * @throws ClassCastException if the class of an element of this list - * is incompatible with the specified collection - * (optional) - * @throws NullPointerException if this list contains a null element and the - * specified collection does not permit null elements - * (optional), - * or if the specified collection is null - * @see #remove(Object) - */ - public boolean retainAll(Collection c) { - if (c == null) throw new NullPointerException(); - Object[] elements = getArray(); - int len = elements.length; - if (len != 0) { - // temp array holds those elements we know we want to keep - int newlen = 0; - Object[] temp = new Object[len]; - for (int i = 0; i < len; ++i) { - Object element = elements[i]; - if (c.contains(element)) - temp[newlen++] = element; - } - if (newlen != len) { - setArray(Arrays.copyOf(temp, newlen)); - return true; - } - } - return false; - } - - /** - * Removes all of the elements from this list. - * The list will be empty after this call returns. - */ - public void clear() { - setArray(new Object[0]); - } - - private ResetableIterator resetableIterator() { - return resetableIterator; - } - - public ResetableIterator iterator() { - return new LevelResetableIterator<>(getArray()); - } - - private static class LevelResetableIterator implements ResetableIterator { - - private final Object[] array; - private int cursor = 0; - private int endPos = -1; - private boolean hasNext; - - private LevelResetableIterator(Object[] array) { - this.array = array; - reset(); - } - - @Override - public ResetableIterator reset() { - endPos = cursor; - hasNext = array.length > 0; - return this; - } - - @Override - public boolean hasNext() { - return hasNext; - } - - @Override - public T next() { - if (!hasNext) { - throw new IllegalStateException(); - } - @SuppressWarnings("unchecked") T result = (T) array[cursor]; - cursor++; - if (cursor == array.length) { - cursor = 0; - } - if (cursor == endPos) { - hasNext = false; - } - return result; - } - } + public Spliterator spliterator() { + return unmodifiableConsumers.spliterator(); } - } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 31973763ae61..ef68e77def1d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -21,8 +21,6 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -43,7 +41,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.stream.Collectors; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -75,7 +72,7 @@ import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.PriorityAware; +import org.apache.activemq.artemis.core.PriorityAware; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.RoutingContext; @@ -102,6 +99,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.PriorityLinkedList; import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl; +import org.apache.activemq.artemis.utils.collections.SingletonIterator; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl; import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; @@ -234,7 +232,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final AtomicInteger consumersCount = new AtomicInteger(); private volatile long consumerRemovedTimestamp = -1; - private final QueueConsumers> consumers = new QueueConsumersImpl<>(); + private final QueueConsumers> consumers = new QueueConsumersImpl<>(); private final Map groups = new HashMap<>(); @@ -1033,8 +1031,7 @@ public void addConsumer(final Consumer consumer) throws Exception { cancelRedistributor(); ConsumerHolder newConsumerHolder = new ConsumerHolder<>(consumer); - if (!consumers.contains(newConsumerHolder)) { - consumers.add(newConsumerHolder); + if (consumers.add(newConsumerHolder)) { int currentConsumerCount = consumers.size(); if (delayBeforeDispatch >= 0) { dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis()); @@ -1192,7 +1189,11 @@ public long getConsumerRemovedTimestamp() { @Override public Set getConsumers() { - return this.consumers.stream().map(ConsumerHolder::consumer).collect(Collectors.toSet()); + Set consumersSet = new HashSet<>(this.consumers.size()); + for (ConsumerHolder consumerHolder : consumers) { + consumersSet.add(consumerHolder.consumer); + } + return consumersSet; } @Override @@ -1364,34 +1365,23 @@ public synchronized List getScheduledMessages() { @Override public Map> getDeliveringMessages() { - - Collection consumerListClone = cloneConsumers(); + final Iterator> consumerHolderIterator; + synchronized (this) { + consumerHolderIterator = redistributor == null ? consumers.iterator() : SingletonIterator.newInstance(redistributor); + } Map> mapReturn = new HashMap<>(); - for (ConsumerHolder holder : consumerListClone) { + while (consumerHolderIterator.hasNext()) { + ConsumerHolder holder = consumerHolderIterator.next(); List msgs = holder.consumer.getDeliveringMessages(); if (msgs != null && msgs.size() > 0) { mapReturn.put(holder.consumer.toManagementString(), msgs); } } - return mapReturn; } - - private Collection cloneConsumers() { - Collection consumersClone; - synchronized (this) { - if (redistributor == null) { - consumersClone = new ArrayList<>(consumers); - } else { - consumersClone = Collections.singletonList(redistributor); - } - } - return consumersClone; - } - @Override public int getDeliveringCount() { return deliveringMetrics.getMessageCount(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java index 33676c4b571d..1055af74eb5a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java @@ -16,7 +16,7 @@ */ package org.apache.activemq.artemis.core.server.impl; -import org.apache.activemq.artemis.core.server.PriorityAware; +import org.apache.activemq.artemis.core.PriorityAware; import org.junit.Before; import org.junit.Test;