diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java index dbd70feb1c052f..f6e7da77b0382a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java @@ -39,6 +39,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.Transactions; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.naming.NamespaceName; @@ -47,6 +48,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.TransactionBufferStats; +import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo; import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats; import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats; import org.apache.pulsar.common.policies.data.TransactionInBufferStats; @@ -68,6 +70,37 @@ @Slf4j public abstract class TransactionsBase extends AdminResource { + protected void internalListCoordinators(AsyncResponse asyncResponse) { + final PulsarAdmin admin; + try { + admin = pulsar().getAdminClient(); + } catch (PulsarServerException ex) { + asyncResponse.resume(new RestException(ex)); + return; + } + admin.lookups() + .lookupPartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName()) + .thenAccept(map -> { + if (map.isEmpty()) { + asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, + "Transaction coordinator not found")); + return; + } + Map result = new HashMap<>(); + map.forEach((topicPartition, brokerServiceUrl) -> { + final int coordinatorId = TopicName.getPartitionIndex(topicPartition); + result.put(coordinatorId, new TransactionCoordinatorInfo(brokerServiceUrl)); + }); + asyncResponse.resume(result); + + }) + .exceptionally(ex -> { + log.error("[{}] Failed to list transaction coordinators.", clientAppId(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative, Integer coordinatorId) { if (coordinatorId != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java index 5dcf57f80f97cf..3add71fe49670b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java @@ -52,6 +52,17 @@ @Slf4j public class Transactions extends TransactionsBase { + @GET + @Path("/coordinators") + @ApiOperation(value = "List transaction coordinators.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 503, message = "This Broker is not " + + "configured with transactionCoordinatorEnabled=true.")}) + public void listCoordinators(@Suspended final AsyncResponse asyncResponse) { + checkTransactionCoordinatorEnabled(); + internalListCoordinators(asyncResponse); + } + @GET @Path("/coordinatorStats") @ApiOperation(value = "Get transaction coordinator stats.") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index 8c002080abe18e..75353b07eeba7c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -62,6 +62,7 @@ import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TransactionBufferStats; +import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo; import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats; import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats; import org.apache.pulsar.common.policies.data.TransactionInBufferStats; @@ -109,6 +110,18 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Test(timeOut = 20000) + public void testListTransactionCoordinators() throws Exception { + initTransaction(4); + final Map result = admin + .transactions().listTransactionCoordinatorsAsync().get(); + assertEquals(result.size(), 4); + final String expectedUrl = pulsar.getBrokerServiceUrl(); + for (int i = 0; i < 4; i++) { + assertEquals(result.get(i).getBrokerServiceUrl(), expectedUrl); + } + } + @Test(timeOut = 20000) public void testGetTransactionCoordinatorStats() throws Exception { initTransaction(2); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java index 3bd5a188dced90..2bde11af5d842d 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.policies.data.TransactionBufferStats; +import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo; import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats; import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats; import org.apache.pulsar.common.policies.data.TransactionInBufferStats; @@ -34,6 +35,21 @@ public interface Transactions { + /** + * List transaction coordinators. + * + * @return the transaction coordinators list. + */ + Map listTransactionCoordinators() throws PulsarAdminException; + + /** + * List transaction coordinators. + * + * @return the future of the transaction coordinators list. + */ + CompletableFuture> listTransactionCoordinatorsAsync(); + + /** * Get transaction metadataStore stats. * diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorInfo.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorInfo.java new file mode 100644 index 00000000000000..ab0464055baee8 --- /dev/null +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorInfo.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; + +/** + * Transaction coordinator information. + */ +@Getter +@NoArgsConstructor +@AllArgsConstructor +public class TransactionCoordinatorInfo { + private String brokerServiceUrl; +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java index 6af80344287cf4..8e141102412773 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java @@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TransactionBufferStats; +import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo; import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats; import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats; import org.apache.pulsar.common.policies.data.TransactionInBufferStats; @@ -48,6 +49,17 @@ public TransactionsImpl(WebTarget web, Authentication auth, long readTimeoutMs) adminV3Transactions = web.path("/admin/v3/transactions"); } + @Override + public Map listTransactionCoordinators() throws PulsarAdminException { + return sync(() -> listTransactionCoordinatorsAsync()); + } + + @Override + public CompletableFuture> listTransactionCoordinatorsAsync() { + WebTarget path = adminV3Transactions.path("coordinators"); + return asyncGetRequest(path, new FutureCallback>(){}); + } + @Override public CompletableFuture getCoordinatorStatsByIdAsync(int coordinatorId) { WebTarget path = adminV3Transactions.path("coordinatorStats"); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index bcb1fca1705e5c..054aca4e0e12b9 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -2294,6 +2294,10 @@ void transactions() throws Exception { cmdTransactions = new CmdTransactions(() -> admin); cmdTransactions.run(split("position-stats-in-pending-ack -t test -s test -l 1 -e 1 -b 1")); verify(transactions).getPositionStatsInPendingAck("test", "test", 1L, 1L, 1); + + cmdTransactions = new CmdTransactions(() -> admin); + cmdTransactions.run(split("coordinators-list")); + verify(transactions).listTransactionCoordinators(); } @Test diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java index a79966035cdedc..12893adbdac1f7 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java @@ -219,6 +219,14 @@ void run() throws Exception { } } + @Parameters(commandDescription = "List transaction coordinators") + private class ListTransactionCoordinators extends CliCommand { + @Override + void run() throws Exception { + print(getAdmin().transactions().listTransactionCoordinators()); + } + } + public CmdTransactions(Supplier admin) { super("transactions", admin); @@ -233,6 +241,7 @@ public CmdTransactions(Supplier admin) { jcommander.addCommand("slow-transactions", new GetSlowTransactions()); jcommander.addCommand("scale-transactionCoordinators", new ScaleTransactionCoordinators()); jcommander.addCommand("position-stats-in-pending-ack", new GetPositionStatsInPendingAck()); + jcommander.addCommand("coordinators-list", new ListTransactionCoordinators()); } }