From 86e8bbed8d65afff6dd3c55ce76407870497cf60 Mon Sep 17 00:00:00 2001 From: Benoit TELLIER Date: Fri, 27 Mar 2026 15:08:39 +0100 Subject: [PATCH] JAMES-4091 Paginate and sort connected users --- .../servers/partials/operate/webadmin.adoc | 25 +- .../webadmin/webadmin-protocols/pom.xml | 44 +++ .../webadmin/ProtocolServerRoutes.java | 110 ++++++- .../ProtocolServerRoutesChannelsTest.java | 290 ++++++++++++++++++ .../src/test/resources/imapServer.xml | 16 + 5 files changed, 479 insertions(+), 6 deletions(-) create mode 100644 server/protocols/webadmin/webadmin-protocols/src/test/java/org/apache/james/protocols/webadmin/ProtocolServerRoutesChannelsTest.java create mode 100644 server/protocols/webadmin/webadmin-protocols/src/test/resources/imapServer.xml diff --git a/docs/modules/servers/partials/operate/webadmin.adoc b/docs/modules/servers/partials/operate/webadmin.adoc index a707c3c20e7..7fcbcd3edbe 100644 --- a/docs/modules/servers/partials/operate/webadmin.adoc +++ b/docs/modules/servers/partials/operate/webadmin.adoc @@ -5251,6 +5251,7 @@ Will return a description and statistics for channels of a user: ] .... +The same optional query parameters as `GET /servers/channels` are supported (see below). === Listing all channels @@ -5288,4 +5289,26 @@ Will return a description and statistics for channels of all users: ] .... -Be warned that the output can be very large if a significant count of channels is opened. \ No newline at end of file +Be warned that the output can be very large if a significant count of channels is opened. + +The following optional query parameters are supported to sort and paginate results: + +[cols="~,~,~", options="header"] +|=== +| Parameter | Default | Description +| `limit` | unlimited | Maximum number of results to return. +| `offset` | `0` | Number of results to skip before returning. +| `sortBy` | none (no sort) | JSON path of the field to sort by. Supports top-level fields (`protocol`, `endpoint`, `remoteAddress`, `connectionDate`, `isActive`, `isOpen`, `isWritable`, `isEncrypted`, `username`) and nested keys inside `protocolSpecificInformation` using dot notation (e.g. `protocolSpecificInformation.cumulativeWrittenBytes`). Unknown paths are treated as an empty value without error. +| `sortDirection` | `asc` | Sort direction. Accepted values: `asc`, `desc` (case-insensitive). Only applied when `sortBy` is set. +| `sortType` | `alphabetical` | How to compare values. `alphabetical` uses natural string order; `numerical` parses values as long integers (non-numeric values sort as `0`). Only applied when `sortBy` is set. +|=== + +Example — list the 10 most data-hungry IMAP connections: + +.... +curl -XGET "/servers/channels?sortBy=protocolSpecificInformation.cumulativeWrittenBytes&sortType=numerical&sortDirection=desc&limit=10" +.... + +Return codes: + +- 200: the list of channels, possibly empty \ No newline at end of file diff --git a/server/protocols/webadmin/webadmin-protocols/pom.xml b/server/protocols/webadmin/webadmin-protocols/pom.xml index 65b20e7d405..e72a8f145ce 100644 --- a/server/protocols/webadmin/webadmin-protocols/pom.xml +++ b/server/protocols/webadmin/webadmin-protocols/pom.xml @@ -30,13 +30,57 @@ Finner grained management for protocols + + ${james.groupId} + apache-james-mailbox-api + test-jar + test + + + ${james.groupId} + apache-james-mailbox-memory + test + + + ${james.groupId} + apache-james-mailbox-memory + test-jar + test + + + ${james.groupId} + event-bus-api + test-jar + test + + + ${james.groupId} + james-server-core + test + ${james.groupId} james-server-data-api + + ${james.groupId} + james-server-protocols-imap4 + test + + + ${james.groupId} + james-server-protocols-library + ${james.groupId} james-server-protocols-library + test-jar + test + + + ${james.groupId} + james-server-testing + test ${james.groupId} diff --git a/server/protocols/webadmin/webadmin-protocols/src/main/java/org/apache/james/protocols/webadmin/ProtocolServerRoutes.java b/server/protocols/webadmin/webadmin-protocols/src/main/java/org/apache/james/protocols/webadmin/ProtocolServerRoutes.java index 087c8b3efdd..8bff24a53f9 100644 --- a/server/protocols/webadmin/webadmin-protocols/src/main/java/org/apache/james/protocols/webadmin/ProtocolServerRoutes.java +++ b/server/protocols/webadmin/webadmin-protocols/src/main/java/org/apache/james/protocols/webadmin/ProtocolServerRoutes.java @@ -22,10 +22,12 @@ import static org.apache.james.DisconnectorNotifier.AllUsersRequest.ALL_USERS_REQUEST; import java.time.Instant; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Stream; import jakarta.inject.Inject; @@ -155,15 +157,19 @@ public void define(Service service) { return Responses.returnNoContent(response); }); - service.get(SERVERS + "/channels", (request, response) -> OBJECT_MAPPER.writeValueAsString(connectionDescriptionSupplier.describeConnections() - .map(ConnectionDescriptionDTO::from) - .toList())); + service.get(SERVERS + "/channels", (request, response) -> { + ChannelsQueryParameters params = ChannelsQueryParameters.from(request); + return OBJECT_MAPPER.writeValueAsString(params.apply(connectionDescriptionSupplier.describeConnections() + .map(ConnectionDescriptionDTO::from)) + .toList()); + }); service.get(SERVERS + "/channels/:user", (request, response) -> { Username username = Username.of(request.params("user")); - return OBJECT_MAPPER.writeValueAsString(connectionDescriptionSupplier.describeConnections() + ChannelsQueryParameters params = ChannelsQueryParameters.from(request); + return OBJECT_MAPPER.writeValueAsString(params.apply(connectionDescriptionSupplier.describeConnections() .filter(connectionDescription -> connectionDescription.username().map(username::equals).orElse(false)) - .map(ConnectionDescriptionDTO::from) + .map(ConnectionDescriptionDTO::from)) .toList()); }); @@ -174,6 +180,100 @@ public void define(Service service) { .toList())); } + private static class ChannelsQueryParameters { + enum SortDirection { + ASC, + DESC + } + + enum SortType { + NUMERICAL, + ALPHABETICAL + } + + private final Optional limit; + private final long offset; + private final Optional sortBy; + private final SortDirection sortDirection; + private final SortType sortType; + + private ChannelsQueryParameters(Optional limit, long offset, Optional sortBy, + SortDirection sortDirection, SortType sortType) { + this.limit = limit; + this.offset = offset; + this.sortBy = sortBy; + this.sortDirection = sortDirection; + this.sortType = sortType; + } + + static ChannelsQueryParameters from(Request request) { + Optional limit = Optional.ofNullable(request.queryParams("limit")).map(Long::parseUnsignedLong); + long offset = Optional.ofNullable(request.queryParams("offset")).map(Long::parseUnsignedLong).orElse(0L); + Optional sortBy = Optional.ofNullable(request.queryParams("sortBy")); + SortDirection sortDirection = Optional.ofNullable(request.queryParams("sortDirection")) + .map(s -> SortDirection.valueOf(s.toUpperCase())) + .orElse(SortDirection.ASC); + SortType sortType = Optional.ofNullable(request.queryParams("sortType")) + .map(s -> SortType.valueOf(s.toUpperCase())) + .orElse(SortType.ALPHABETICAL); + return new ChannelsQueryParameters(limit, offset, sortBy, sortDirection, sortType); + } + + Stream apply(Stream stream) { + Stream result = stream; + if (sortBy.isPresent()) { + result = result.sorted(buildComparator(sortBy.get())); + } + if (offset > 0) { + result = result.skip(offset); + } + if (limit.isPresent()) { + result = result.limit(limit.get()); + } + return result; + } + + private Comparator buildComparator(String field) { + Comparator comparator; + if (sortType == SortType.NUMERICAL) { + comparator = Comparator.comparingLong(a -> extractNumeric(a, field)); + } else { + comparator = Comparator.comparing(a -> extractString(a, field)); + } + if (sortDirection == SortDirection.DESC) { + comparator = comparator.reversed(); + } + return comparator; + } + + private static long extractNumeric(ConnectionDescriptionDTO dto, String field) { + try { + return Long.parseLong(extractString(dto, field)); + } catch (NumberFormatException e) { + return 0L; + } + } + + private static String extractString(ConnectionDescriptionDTO dto, String field) { + if (field.startsWith("protocolSpecificInformation.")) { + String key = field.substring("protocolSpecificInformation.".length()); + return Optional.ofNullable(dto.protocolSpecificInformation().get(key)).orElse(""); + } + return switch (field) { + case "protocol" -> dto.protocol(); + case "endpoint" -> dto.endpoint(); + case "remoteAddress" -> dto.remoteAddress().orElse(""); + case "connectionDate" -> dto.connectionDate().map(Instant::toString).orElse(""); + case "isActive" -> String.valueOf(dto.isActive()); + case "isOpen" -> String.valueOf(dto.isOpen()); + case "isWritable" -> String.valueOf(dto.isWritable()); + case "isEncrypted" -> String.valueOf(dto.isEncrypted()); + case "username" -> dto.username().orElse(""); + default -> ""; + }; + } + } + private Predicate filters(Request request) { Optional port = Optional.ofNullable(request.queryParams("port")).map(Integer::parseUnsignedInt).map(Port::of); diff --git a/server/protocols/webadmin/webadmin-protocols/src/test/java/org/apache/james/protocols/webadmin/ProtocolServerRoutesChannelsTest.java b/server/protocols/webadmin/webadmin-protocols/src/test/java/org/apache/james/protocols/webadmin/ProtocolServerRoutesChannelsTest.java new file mode 100644 index 00000000000..c2c16beafaa --- /dev/null +++ b/server/protocols/webadmin/webadmin-protocols/src/test/java/org/apache/james/protocols/webadmin/ProtocolServerRoutesChannelsTest.java @@ -0,0 +1,290 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.protocols.webadmin; + +import static io.restassured.RestAssured.given; +import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; + +import java.time.Duration; + +import org.apache.james.DisconnectorNotifier; +import org.apache.james.core.Username; +import org.apache.james.imap.encode.main.DefaultImapEncoderFactory; +import org.apache.james.imap.main.DefaultImapDecoderFactory; +import org.apache.james.imap.processor.fetch.FetchProcessor; +import org.apache.james.imap.processor.main.DefaultImapProcessorFactory; +import org.apache.james.imapserver.netty.IMAPServer; +import org.apache.james.imapserver.netty.ImapMetrics; +import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources; +import org.apache.james.mailbox.store.FakeAuthenticator; +import org.apache.james.mailbox.store.FakeAuthorizator; +import org.apache.james.mailbox.store.StoreSubscriptionManager; +import org.apache.james.metrics.api.NoopGaugeRegistry; +import org.apache.james.metrics.tests.RecordingMetricFactory; +import org.apache.james.protocols.lib.mock.ConfigLoader; +import org.apache.james.server.core.filesystem.FileSystemImpl; +import org.apache.james.util.ClassLoaderUtils; +import org.apache.james.utils.TestIMAPClient; +import org.apache.james.webadmin.WebAdminServer; +import org.apache.james.webadmin.WebAdminUtils; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.google.common.collect.ImmutableSet; + +import io.restassured.specification.RequestSpecification; + +class ProtocolServerRoutesChannelsTest { + private static final Username ALICE = Username.of("alice@domain.org"); + private static final Username BOB = Username.of("bob@domain.org"); + private static final String PASSWORD = "secret"; + + @RegisterExtension + TestIMAPClient aliceClient = new TestIMAPClient(); + @RegisterExtension + TestIMAPClient bobClient = new TestIMAPClient(); + + private IMAPServer imapServer; + private WebAdminServer webAdminServer; + private RequestSpecification spec; + private int imapPort; + + @BeforeEach + void setUp() throws Exception { + FakeAuthenticator authenticator = new FakeAuthenticator(); + authenticator.addUser(ALICE, PASSWORD); + authenticator.addUser(BOB, PASSWORD); + + InMemoryIntegrationResources resources = InMemoryIntegrationResources.builder() + .authenticator(authenticator) + .authorizator(FakeAuthorizator.defaultReject()) + .inVmEventBus() + .defaultAnnotationLimits() + .defaultMessageParser() + .scanningSearchIndex() + .noPreDeletionHooks() + .storeQuotaManager() + .build(); + + RecordingMetricFactory metricFactory = new RecordingMetricFactory(); + imapServer = new IMAPServer( + new DefaultImapDecoderFactory().buildImapDecoder(), + new DefaultImapEncoderFactory().buildImapEncoder(), + DefaultImapProcessorFactory.createXListSupportingProcessor( + resources.getMailboxManager(), + resources.getEventBus(), + new StoreSubscriptionManager( + resources.getMailboxManager().getMapperFactory(), + resources.getMailboxManager().getMapperFactory(), + resources.getEventBus()), + null, + resources.getQuotaManager(), + resources.getQuotaRootResolver(), + metricFactory, + FetchProcessor.LocalCacheConfiguration.DEFAULT), + new ImapMetrics(metricFactory), + new NoopGaugeRegistry(), + ImmutableSet.of()); + + imapServer.setFileSystem(FileSystemImpl.forTestingWithConfigurationFromClasspath()); + imapServer.configure(ConfigLoader.getConfig(ClassLoaderUtils.getSystemResourceAsSharedStream("imapServer.xml"))); + imapServer.init(); + + imapPort = imapServer.getListenAddresses().get(0).getPort(); + + DisconnectorNotifier disconnectorNotifier = new DisconnectorNotifier.InVMDisconnectorNotifier(imapServer); + webAdminServer = WebAdminUtils.createWebAdminServer( + new ProtocolServerRoutes(ImmutableSet.of(), disconnectorNotifier, imapServer)) + .start(); + spec = WebAdminUtils.spec(webAdminServer.getPort()); + } + + @AfterEach + void tearDown() { + imapServer.destroy(); + webAdminServer.destroy(); + } + + @Test + void getChannelsShouldReturnEmptyWhenNoConnections() { + given(spec).get("/servers/channels") + .then().statusCode(200).body("", hasSize(0)); + } + + @Test + void getChannelsShouldListLoggedInUser() throws Exception { + aliceClient.connect("127.0.0.1", imapPort).login(ALICE, PASSWORD); + + awaitChannelCount(1); + + given(spec).get("/servers/channels") + .then().statusCode(200) + .body("[0].username", equalTo(ALICE.asString())) + .body("[0].protocol", equalTo("IMAP")); + } + + @Test + void getChannelsByUserShouldFilterByUsername() throws Exception { + aliceClient.connect("127.0.0.1", imapPort).login(ALICE, PASSWORD); + bobClient.connect("127.0.0.1", imapPort).login(BOB, PASSWORD); + + awaitChannelCount(2); + + given(spec).get("/servers/channels/" + ALICE.asString()) + .then().statusCode(200) + .body("", hasSize(1)) + .body("[0].username", equalTo(ALICE.asString())); + + given(spec).get("/servers/channels/" + BOB.asString()) + .then().statusCode(200) + .body("", hasSize(1)) + .body("[0].username", equalTo(BOB.asString())); + } + + @Test + void getConnectedUsersShouldListDistinctLoggedInUsers() throws Exception { + aliceClient.connect("127.0.0.1", imapPort).login(ALICE, PASSWORD); + bobClient.connect("127.0.0.1", imapPort).login(BOB, PASSWORD); + + awaitChannelCount(2); + + given(spec).get("/servers/connectedUsers") + .then().statusCode(200) + .body("", hasSize(2)) + .body("", hasItem(ALICE.asString())) + .body("", hasItem(BOB.asString())); + } + + @Test + void deleteChannelsByUserShouldDisconnectUser() throws Exception { + aliceClient.connect("127.0.0.1", imapPort).login(ALICE, PASSWORD); + bobClient.connect("127.0.0.1", imapPort).login(BOB, PASSWORD); + + awaitChannelCount(2); + + given(spec).delete("/servers/channels/" + ALICE.asString()) + .then().statusCode(204); + + Awaitility.await().atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> + given(spec).get("/servers/channels") + .then().body("username", hasItem(BOB.asString())) + .body("username.flatten()", org.hamcrest.Matchers.not(hasItem(ALICE.asString())))); + } + + @Test + void deleteAllChannelsShouldDisconnectEveryone() throws Exception { + aliceClient.connect("127.0.0.1", imapPort).login(ALICE, PASSWORD); + bobClient.connect("127.0.0.1", imapPort).login(BOB, PASSWORD); + + awaitChannelCount(2); + + given(spec).delete("/servers/channels") + .then().statusCode(204); + + Awaitility.await().atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> + given(spec).get("/servers/channels") + .then().body("", hasSize(0))); + } + + @Test + void limitShouldRestrictNumberOfResults() throws Exception { + aliceClient.connect("127.0.0.1", imapPort).login(ALICE, PASSWORD); + bobClient.connect("127.0.0.1", imapPort).login(BOB, PASSWORD); + + awaitChannelCount(2); + + given(spec).get("/servers/channels?limit=1") + .then().statusCode(200).body("", hasSize(1)); + } + + @Test + void offsetShouldSkipResults() throws Exception { + aliceClient.connect("127.0.0.1", imapPort).login(ALICE, PASSWORD); + bobClient.connect("127.0.0.1", imapPort).login(BOB, PASSWORD); + + awaitChannelCount(2); + + given(spec).get("/servers/channels?offset=1") + .then().statusCode(200).body("", hasSize(1)); + } + + @Test + void sortByShouldOrderResultsAlphabetically() throws Exception { + aliceClient.connect("127.0.0.1", imapPort).login(ALICE, PASSWORD); + bobClient.connect("127.0.0.1", imapPort).login(BOB, PASSWORD); + + awaitChannelCount(2); + + assertThat( + given(spec).get("/servers/channels?sortBy=username&sortDirection=asc") + .then().statusCode(200).extract().jsonPath().getList("username")) + .isSortedAccordingTo(String::compareTo); + } + + @Test + void sortByDescShouldReverseOrder() throws Exception { + aliceClient.connect("127.0.0.1", imapPort).login(ALICE, PASSWORD); + bobClient.connect("127.0.0.1", imapPort).login(BOB, PASSWORD); + + awaitChannelCount(2); + + assertThat( + given(spec).get("/servers/channels?sortBy=username&sortDirection=desc") + .then().statusCode(200).extract().jsonPath().getList("username")) + .isSortedAccordingTo((a, b) -> b.compareTo(a)); + } + + @Test + void sortByProtocolSpecificInformationShouldSortNumerically() throws Exception { + aliceClient.connect("127.0.0.1", imapPort).login(ALICE, PASSWORD); + bobClient.connect("127.0.0.1", imapPort).login(BOB, PASSWORD); + + awaitChannelCount(2); + + // cumulativeWrittenBytes is a numeric field in protocolSpecificInformation + given(spec).get("/servers/channels?sortBy=protocolSpecificInformation.cumulativeWrittenBytes&sortType=numerical&sortDirection=asc") + .then().statusCode(200).body("", hasSize(2)); + } + + @Test + void unknownSortByShouldReturnResultsWithoutError() throws Exception { + aliceClient.connect("127.0.0.1", imapPort).login(ALICE, PASSWORD); + + awaitChannelCount(1); + + given(spec).get("/servers/channels?sortBy=unknownField") + .then().statusCode(200).body("", hasSize(1)); + } + + private void awaitChannelCount(int expected) { + Awaitility.await().atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> + given(spec).get("/servers/channels") + .then().body("", hasSize(expected))); + } +} diff --git a/server/protocols/webadmin/webadmin-protocols/src/test/resources/imapServer.xml b/server/protocols/webadmin/webadmin-protocols/src/test/resources/imapServer.xml new file mode 100644 index 00000000000..458288c3ce0 --- /dev/null +++ b/server/protocols/webadmin/webadmin-protocols/src/test/resources/imapServer.xml @@ -0,0 +1,16 @@ + + + imapserver + 0.0.0.0:0 + 200 + 0 + 0 + 120 + SECONDS + true + 64K + 128K + false + false + 20 +