Skip to content

Commit

Permalink
[improve][transactions] Add command to list transaction coordinators url
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Sep 7, 2022
1 parent eab2bb5 commit ede029b
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 0 deletions.
Expand Up @@ -40,6 +40,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.NamespaceName;
Expand All @@ -48,6 +49,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
Expand All @@ -69,6 +71,37 @@
@Slf4j
public abstract class TransactionsBase extends AdminResource {

protected void internalListCoordinators(AsyncResponse asyncResponse) {
final PulsarAdmin admin;
try {
admin = pulsar().getAdminClient();
} catch (PulsarServerException ex) {
asyncResponse.resume(new RestException(ex));
return;
}
admin.lookups()
.lookupPartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName())
.thenAccept(map -> {
if (map.isEmpty()) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
"Transaction coordinator not found"));
return;
}
Map<Integer, TransactionCoordinatorInfo> result = new HashMap<>();
map.forEach((topicPartition, brokerServiceUrl) -> {
final int coordinatorId = TopicName.getPartitionIndex(topicPartition);
result.put(coordinatorId, new TransactionCoordinatorInfo(brokerServiceUrl));
});
asyncResponse.resume(result);

})
.exceptionally(ex -> {
log.error("[{}] Failed to list transaction coordinators.", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
Integer coordinatorId) {
if (coordinatorId != null) {
Expand Down
Expand Up @@ -52,6 +52,17 @@
@Slf4j
public class Transactions extends TransactionsBase {

@GET
@Path("/coordinators")
@ApiOperation(value = "List transaction coordinators.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 503, message = "This Broker is not "
+ "configured with transactionCoordinatorEnabled=true.")})
public void listCoordinators(@Suspended final AsyncResponse asyncResponse) {
checkTransactionCoordinatorEnabled();
internalListCoordinators(asyncResponse);
}

@GET
@Path("/coordinatorStats")
@ApiOperation(value = "Get transaction coordinator stats.")
Expand Down
Expand Up @@ -61,6 +61,7 @@
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
Expand Down Expand Up @@ -102,6 +103,18 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(timeOut = 20000)
public void testListTransactionCoordinators() throws Exception {
initTransaction(4);
final Map<Integer, TransactionCoordinatorInfo> result = admin
.transactions().listTransactionCoordinatorsAsync().get();
assertEquals(result.size(), 4);
final String expectedUrl = pulsar.getBrokerServiceUrl();
for (int i = 0; i < 4; i++) {
assertEquals(result.get(i).getBrokerServiceUrl(), expectedUrl);
}
}

@Test(timeOut = 20000)
public void testGetTransactionCoordinatorStats() throws Exception {
initTransaction(2);
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
Expand All @@ -34,6 +35,21 @@

public interface Transactions {

/**
* List transaction coordinators.
*
* @return the transaction coordinators list.
*/
Map<Integer, TransactionCoordinatorInfo> listTransactionCoordinators() throws PulsarAdminException;

/**
* List transaction coordinators.
*
* @return the future of the transaction coordinators list.
*/
CompletableFuture<Map<Integer, TransactionCoordinatorInfo>> listTransactionCoordinatorsAsync();


/**
* Get transaction metadataStore stats.
*
Expand Down
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.policies.data;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

/**
* Transaction coordinator information.
*/
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class TransactionCoordinatorInfo {
private String brokerServiceUrl;
}
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
Expand All @@ -48,6 +49,17 @@ public TransactionsImpl(WebTarget web, Authentication auth, long readTimeoutMs)
adminV3Transactions = web.path("/admin/v3/transactions");
}

@Override
public Map<Integer, TransactionCoordinatorInfo> listTransactionCoordinators() throws PulsarAdminException {
return sync(() -> listTransactionCoordinatorsAsync());
}

@Override
public CompletableFuture<Map<Integer, TransactionCoordinatorInfo>> listTransactionCoordinatorsAsync() {
WebTarget path = adminV3Transactions.path("coordinators");
return asyncGetRequest(path, new FutureCallback<Map<Integer, TransactionCoordinatorInfo>>(){});
}

@Override
public CompletableFuture<TransactionCoordinatorStats> getCoordinatorStatsByIdAsync(int coordinatorId) {
WebTarget path = adminV3Transactions.path("coordinatorStats");
Expand Down
Expand Up @@ -2285,6 +2285,10 @@ void transactions() throws Exception {
cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("position-stats-in-pending-ack -t test -s test -l 1 -e 1 -b 1"));
verify(transactions).getPositionStatsInPendingAck("test", "test", 1L, 1L, 1);

cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("coordinators-list"));
verify(transactions).listTransactionCoordinators();
}

@Test
Expand Down
Expand Up @@ -219,6 +219,14 @@ void run() throws Exception {
}
}

@Parameters(commandDescription = "List transaction coordinators")
private class ListTransactionCoordinators extends CliCommand {
@Override
void run() throws Exception {
print(getAdmin().transactions().listTransactionCoordinators());
}
}


public CmdTransactions(Supplier<PulsarAdmin> admin) {
super("transactions", admin);
Expand All @@ -233,6 +241,7 @@ public CmdTransactions(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("slow-transactions", new GetSlowTransactions());
jcommander.addCommand("scale-transactionCoordinators", new ScaleTransactionCoordinators());
jcommander.addCommand("position-stats-in-pending-ack", new GetPositionStatsInPendingAck());
jcommander.addCommand("coordinators-list", new ListTransactionCoordinators());

}
}

0 comments on commit ede029b

Please sign in to comment.