Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-196 Implement Consumer Priority #2490

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -32,6 +32,7 @@ public class QueueAttributes implements Serializable {
public static final String PURGE_ON_NO_CONSUMERS = "purge-on-no-consumers";
public static final String CONSUMERS_BEFORE_DISPATCH = "consumers-before-dispatch";
public static final String DELAY_BEFORE_DISPATCH = "delay-before-dispatch";
public static final String CONSUMER_PRIORITY = "consumer-priority";

private RoutingType routingType;
private SimpleString filterString;
Expand All @@ -44,6 +45,7 @@ public class QueueAttributes implements Serializable {
private Boolean purgeOnNoConsumers;
private Integer consumersBeforeDispatch;
private Long delayBeforeDispatch;
private Integer consumerPriority;

public void set(String key, String value) {
if (key != null && value != null) {
Expand All @@ -69,6 +71,8 @@ public void set(String key, String value) {
setConsumersBeforeDispatch(Integer.valueOf(value));
} else if (key.equals(DELAY_BEFORE_DISPATCH)) {
setDelayBeforeDispatch(Long.valueOf(value));
} else if (key.equals(CONSUMER_PRIORITY)) {
setConsumerPriority(Integer.valueOf(value));
}
}
}
Expand Down Expand Up @@ -172,4 +176,13 @@ public QueueAttributes setDelayBeforeDispatch(Long delayBeforeDispatch) {
return this;
}

public Integer getConsumerPriority() {
return consumerPriority;
}

public QueueAttributes setConsumerPriority(Integer consumerPriority) {
this.consumerPriority = consumerPriority;
return this;
}

}
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core;

public interface PriorityAware {

int getPriority();
}
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;

import java.util.Collection;

/**
* Provides an Array Iterator that is able to reset, allowing you to iterate over the full array.
* It achieves this though by moving end position mark to the the current cursors position,
* so it round robins, even with reset.
* @param <T>
*/
public class ArrayResettableIterator<T> implements ResettableIterator<T> {

private final Object[] array;
private int cursor = 0;
private int endPos = -1;
private boolean hasNext;

public ArrayResettableIterator(Object[] array) {
this.array = array;
reset();
}

public static <T> ResettableIterator<T> iterator(Collection<T> collection) {
return new ArrayResettableIterator<>(collection.toArray());
}

@Override
public void reset() {
endPos = cursor;
hasNext = array.length > 0;
}

@Override
public boolean hasNext() {
return hasNext;
}

@Override
public T next() {
if (!hasNext) {
throw new IllegalStateException();
}
@SuppressWarnings("unchecked") T result = (T) array[cursor];
cursor++;
if (cursor == array.length) {
cursor = 0;
}
if (cursor == endPos) {
hasNext = false;
}
return result;
}
}
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;

import java.util.Iterator;

/**
* Provides an Iterator that works over multiple underlying iterators.
*
* @param <T> type of the class of the iterator.
*/
public class MultiIterator<T> extends MultiIteratorBase<T, Iterator<T>> {

public MultiIterator(Iterator<T>[] iterators) {
michaelandrepearce marked this conversation as resolved.
Show resolved Hide resolved
super(iterators);
}
}
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;

import java.util.Iterator;

/**
* Provides an Abstract Iterator that works over multiple underlying iterators.
*
* @param <T> type of the class of the iterator.
* @param <I> type of the iterator
*/
abstract class MultiIteratorBase<T, I extends Iterator<T>> implements Iterator<T> {

private final I[] iterators;
private int index = -1;

MultiIteratorBase(I[] iterators) {
this.iterators = iterators;
}

@Override
public boolean hasNext() {
while (true) {
if (index != -1) {
I currentIterator = get(index);
if (currentIterator.hasNext()) {
return true;
}
}
int next = index + 1;
if (next < iterators.length) {
moveTo(next);
} else {
return false;
}
}
}

@Override
public T next() {
while (true) {
if (index != -1) {
I currentIterator = get(index);
if (currentIterator.hasNext()) {
return currentIterator.next();
}
}
int next = index + 1;
if (next < iterators.length) {
moveTo(next);
} else {
return null;
}
}
}

protected void moveTo(int index) {
this.index = index;
}

protected I get(int index) {
return iterators[index];
}
}
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;

/**
* Extends MultiIterator, adding the ability if the underlying iterators are resettable, then its self can reset.
* It achieves this by going back to the first iterator, and as moves to another iterator it resets it.
*
* @param <T> type of the class of the iterator.
*/
public class MultiResettableIterator<T> extends MultiIteratorBase<T, ResettableIterator<T>> implements ResettableIterator<T> {

public MultiResettableIterator(ResettableIterator<T>[] iterators) {
super(iterators);
}

@Override
protected void moveTo(int index) {
super.moveTo(index);
if (index > -1) {
get(index).reset();
}
}

@Override
public void reset() {
moveTo(-1);
}
}