Skip to content

Commit

Permalink
This closes #3484
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Mar 10, 2021
2 parents 6bff20f + 20007ad commit 4f5821d
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -847,9 +847,11 @@ public Map<String, Object>[] listMessages(final String filterStr) throws Excepti
Filter filter = FilterImpl.createFilter(filterStr);
List<Map<String, Object>> messages = new ArrayList<>();
queue.flushExecutor();
final int limit = addressSettingsRepository.getMatch(queue.getAddress().toString()).getManagementBrowsePageSize();
int count = 0;
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
try {
while (iterator.hasNext()) {
while (iterator.hasNext() && count++ < limit) {
MessageReference ref = iterator.next();
if (filter == null || filter.match(ref.getMessage())) {
Message message = ref.getMessage();
Expand Down Expand Up @@ -983,9 +985,11 @@ private Map<String, Long> internalCountMessages(final String filterStr, final St
if (filter == null && groupByProperty == null) {
result.put(null, getMessageCount());
} else {
final int limit = addressSettingsRepository.getMatch(queue.getAddress().toString()).getManagementBrowsePageSize();
int count = 0;
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
try {
while (iterator.hasNext()) {
while (iterator.hasNext() && count++ < limit) {
Message message = iterator.next().getMessage();
internalComputeMessage(result, filter, groupByProperty, message);
}
Expand Down Expand Up @@ -1593,14 +1597,14 @@ public CompositeData[] browse(String filter) throws Exception {

clearIO();
try {
int pageSize = addressSettingsRepository.getMatch(queue.getName().toString()).getManagementBrowsePageSize();
int limit = addressSettingsRepository.getMatch(queue.getAddress().toString()).getManagementBrowsePageSize();
int currentPageSize = 0;
ArrayList<CompositeData> c = new ArrayList<>();
Filter thefilter = FilterImpl.createFilter(filter);
queue.flushExecutor();
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
try {
while (iterator.hasNext() && currentPageSize++ < pageSize) {
while (iterator.hasNext() && currentPageSize++ < limit) {
MessageReference ref = iterator.next();
if (thefilter == null || thefilter.match(ref.getMessage())) {
c.add(OpenTypeSupport.convert(ref));
Expand Down
4 changes: 2 additions & 2 deletions docs/user-manual/en/address-model.md
Original file line number Diff line number Diff line change
Expand Up @@ -922,8 +922,8 @@ config reload, by delete policy: `OFF` or `FORCE`. Default is `OFF`. Read more
about [configuration reload](config-reload.md).

`management-browse-page-size` is the number of messages a management resource
can browse. This is relevant for the "browse" management method exposed on the
queue control. Default is `200`.
can browse. This is relevant for the `browse, list and count-with-filter` management
methods exposed on the queue control. Default is `200`.

`default-purge-on-no-consumers` defines a queue's default
`purge-on-no-consumers` setting if none is provided on the queue itself.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3331,6 +3331,40 @@ public void testSendMessageWithProperties() throws Exception {
Assert.assertEquals(new String(body), "theBody");
}

@Test
public void testBrowseLimitOnListBrowseAndFilteredCount() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();

AddressSettings addressSettings = new AddressSettings().setManagementBrowsePageSize(5);
server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);

session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));

ClientProducer producer = session.createProducer(address);
for (int i = 0; i < 10; i++) {
producer.send(session.createMessage(true));
}
producer.close();

QueueControl queueControl = createManagementControl(address, queue);


// no filter, delegates to count metric
Wait.assertEquals(10, queueControl::getMessageCount);

assertEquals(5, queueControl.browse().length);
assertEquals(5, queueControl.listMessages("").length);

JsonArray array = JsonUtil.readJsonArray(queueControl.listMessagesAsJSON(""));
assertEquals(5, array.size());

// filer could match all
assertEquals(5, queueControl.countMessages("AMQSize > 0"));

session.deleteQueue(queue);
}

@Test
public void testResetGroups() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
Expand Down

0 comments on commit 4f5821d

Please sign in to comment.