Skip to content

Commit

Permalink
[Broker] Fix call sync method in async rest api for internalGetManage…
Browse files Browse the repository at this point in the history
…dLedgerInfo (#13847)
  • Loading branch information
mattisonchao committed Jan 24, 2022
1 parent e50493e commit a805cba
Showing 1 changed file with 93 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1213,103 +1213,110 @@ protected PersistentTopicInternalStats internalGetInternalStats(boolean authorit
}

protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
try {
validateGlobalNamespaceOwnership(namespaceName);
} catch (Exception e) {
log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
}

// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
getPartitionedTopicMetadataAsync(topicName,
authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<String>> futures = Lists.newArrayList();

PartitionedManagedLedgerInfo partitionedManagedLedgerInfo = new PartitionedManagedLedgerInfo();

for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.getInternalInfoAsync(
topicNamePartition.toString()).whenComplete((response, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to get managed info for {}",
clientAppId(), topicNamePartition, throwable);
asyncResponse.resume(new RestException(throwable));
}
try {
partitionedManagedLedgerInfo.partitions.put(topicNamePartition.toString(),
jsonMapper().readValue(response,
ManagedLedgerInfo.class));
} catch (JsonProcessingException ex) {
log.error("[{}] Failed to parse ManagedLedgerInfo for {} from [{}]",
clientAppId(),
topicNamePartition, response, ex);
}
}));
} catch (Exception e) {
log.error("[{}] Failed to get managed info for {}", clientAppId(), topicNamePartition, e);
throw new RestException(e);
future = CompletableFuture.completedFuture(null);
}
future.thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<String>> futures =
Lists.newArrayListWithCapacity(partitionMetadata.partitions);
PartitionedManagedLedgerInfo partitionedManagedLedgerInfo = new PartitionedManagedLedgerInfo();
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.getInternalInfoAsync(topicNamePartition.toString())
.whenComplete((response, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to get managed info for {}",
clientAppId(), topicNamePartition, throwable);
asyncResponse.resume(new RestException(throwable));
}
try {
partitionedManagedLedgerInfo.partitions
.put(topicNamePartition.toString(), jsonMapper()
.readValue(response, ManagedLedgerInfo.class));
} catch (JsonProcessingException ex) {
log.error("[{}] Failed to parse ManagedLedgerInfo for {} from [{}]",
clientAppId(), topicNamePartition, response, ex);
}
})
);
} catch (PulsarServerException e) {
log.error("[{}] Failed to get admin client while get managed info for {}" ,
clientAppId(), topicNamePartition, e);
throw new RestException(e);
}
}
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
} else {
log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, t);
asyncResponse.resume(new RestException(t));
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
} else {
log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, t);
asyncResponse.resume(new RestException(t));
}
}
}
asyncResponse.resume((StreamingOutput) output -> {
jsonMapper().writer().writeValue(output, partitionedManagedLedgerInfo);
asyncResponse.resume((StreamingOutput) output -> {
jsonMapper().writer().writeValue(output, partitionedManagedLedgerInfo);
});
return null;
});
return null;
});
} else {
internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
}
}).exceptionally(ex -> {
log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
} else {
internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
}
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get partitioned metadata while get managed info for {}",
clientAppId(), topicName, cause);
resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});
}
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to validate the global namespace ownership while get managed info for {}",
clientAppId(), topicName, cause);
resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});
}

protected void internalGetManagedLedgerInfoForNonPartitionedTopic(AsyncResponse asyncResponse) {
String managedLedger;
try {
validateTopicOperation(topicName, TopicOperation.GET_STATS);
managedLedger = topicName.getPersistenceNamingEncoding();
} catch (Exception e) {
log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
pulsar().getManagedLedgerFactory().asyncGetManagedLedgerInfo(managedLedger, new ManagedLedgerInfoCallback() {
@Override
public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
asyncResponse.resume((StreamingOutput) output -> {
jsonMapper().writer().writeValue(output, info);
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
.thenAccept(__ -> {
String managedLedger = topicName.getPersistenceNamingEncoding();
pulsar().getManagedLedgerFactory()
.asyncGetManagedLedgerInfo(managedLedger, new ManagedLedgerInfoCallback() {
@Override
public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
asyncResponse.resume((StreamingOutput) output -> {
jsonMapper().writer().writeValue(output, info);
});
}
@Override
public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
asyncResponse.resume(exception);
}
}, null);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, cause);
resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});
}

@Override
public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
asyncResponse.resume(exception);
}
}, null);
}

protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative, boolean perPartition,
Expand Down

0 comments on commit a805cba

Please sign in to comment.