Skip to content

Commit

Permalink
Address pr's comment and rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Oct 21, 2022
1 parent f87603f commit f3f9c78
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 16 deletions.
Expand Up @@ -78,24 +78,36 @@ protected void internalListCoordinators(AsyncResponse asyncResponse) {
asyncResponse.resume(new RestException(ex));
return;
}
Map<Integer, TransactionCoordinatorInfo> result = new HashMap<>();
admin.lookups()
.lookupPartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName())
.thenAccept(map -> {
if (map.isEmpty()) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
"Transaction coordinator not found"));
return;
}
Map<Integer, TransactionCoordinatorInfo> result = new HashMap<>();
.thenCompose(map -> {
map.forEach((topicPartition, brokerServiceUrl) -> {
final int coordinatorId = TopicName.getPartitionIndex(topicPartition);
result.put(coordinatorId, new TransactionCoordinatorInfo(brokerServiceUrl));
result.put(coordinatorId, new TransactionCoordinatorInfo(coordinatorId, brokerServiceUrl));
});
asyncResponse.resume(result);

return getPulsarResources()
.getTopicResources()
.getExistingPartitions(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN);
})
.thenAccept(allPartitions -> {
allPartitions
.stream()
.filter(partition -> SystemTopicNames
.isTransactionCoordinatorAssign(TopicName.get(partition)))
.forEach(partition -> {
final int coordinatorId = TopicName.getPartitionIndex(partition);
if (!result.containsKey(coordinatorId)) {
result.put(coordinatorId,
new TransactionCoordinatorInfo(coordinatorId, null));
}
});
asyncResponse.resume(result.values());
})
.exceptionally(ex -> {
log.error("[{}] Failed to list transaction coordinators.", clientAppId(), ex);
log.error("[{}] Failed to list transaction coordinators: {}",
clientAppId(), ex.getMessage(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
Expand Down
Expand Up @@ -113,8 +113,9 @@ protected void cleanup() throws Exception {
@Test(timeOut = 20000)
public void testListTransactionCoordinators() throws Exception {
initTransaction(4);
final Map<Integer, TransactionCoordinatorInfo> result = admin
final List<TransactionCoordinatorInfo> result = admin
.transactions().listTransactionCoordinatorsAsync().get();
System.out.println("result" + result);
assertEquals(result.size(), 4);
final String expectedUrl = pulsar.getBrokerServiceUrl();
for (int i = 0; i < 4; i++) {
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.admin;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -40,14 +41,14 @@ public interface Transactions {
*
* @return the transaction coordinators list.
*/
Map<Integer, TransactionCoordinatorInfo> listTransactionCoordinators() throws PulsarAdminException;
List<TransactionCoordinatorInfo> listTransactionCoordinators() throws PulsarAdminException;

/**
* List transaction coordinators.
*
* @return the future of the transaction coordinators list.
*/
CompletableFuture<Map<Integer, TransactionCoordinatorInfo>> listTransactionCoordinatorsAsync();
CompletableFuture<List<TransactionCoordinatorInfo>> listTransactionCoordinatorsAsync();


/**
Expand Down
Expand Up @@ -22,13 +22,16 @@
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;

/**
* Transaction coordinator information.
*/
@Getter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class TransactionCoordinatorInfo {
private long id;
private String brokerServiceUrl;
}
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.client.admin.internal;

import static com.google.common.base.Preconditions.checkArgument;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -50,14 +52,14 @@ public TransactionsImpl(WebTarget web, Authentication auth, long readTimeoutMs)
}

@Override
public Map<Integer, TransactionCoordinatorInfo> listTransactionCoordinators() throws PulsarAdminException {
public List<TransactionCoordinatorInfo> listTransactionCoordinators() throws PulsarAdminException {
return sync(() -> listTransactionCoordinatorsAsync());
}

@Override
public CompletableFuture<Map<Integer, TransactionCoordinatorInfo>> listTransactionCoordinatorsAsync() {
public CompletableFuture<List<TransactionCoordinatorInfo>> listTransactionCoordinatorsAsync() {
WebTarget path = adminV3Transactions.path("coordinators");
return asyncGetRequest(path, new FutureCallback<Map<Integer, TransactionCoordinatorInfo>>(){});
return asyncGetRequest(path, new FutureCallback<List<TransactionCoordinatorInfo>>(){});
}

@Override
Expand Down

0 comments on commit f3f9c78

Please sign in to comment.