Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1017,4 +1017,13 @@ void createSubscription(String topic, String subscriptionName, MessageId message
* @return the status of the offload operation
*/
OffloadProcessStatus offloadStatus(String topic) throws PulsarAdminException;

/**
* Get the last commit message Id of a topic
*
* @param topic the topic name
* @return
* @throws PulsarAdminException
*/
MessageId getLastMessageId(String topic) throws PulsarAdminException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -950,5 +950,37 @@ private List<Message<byte[]>> getIndividualMsgsFromBatch(String topic, String ms
return ret;
}

@Override
public MessageId getLastMessageId(String topic) throws PulsarAdminException {
try {
return (MessageIdImpl) getLastMessageIdAsync(topic).get();
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e.getCause());
}
}

public CompletableFuture<MessageId> getLastMessageIdAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "lastMessageId");
final CompletableFuture<MessageId> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<MessageIdImpl>() {

@Override
public void completed(MessageIdImpl response) {
future.complete(response);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public CmdTopics(PulsarAdmin admin) {
jcommander.addCommand("compaction-status", new CompactionStatusCmd());
jcommander.addCommand("offload", new Offload());
jcommander.addCommand("offload-status", new OffloadStatusCmd());
jcommander.addCommand("last-message-id", new GetLastMessageId());
}

@Parameters(commandDescription = "Get the list of topics under a namespace.")
Expand Down Expand Up @@ -708,4 +709,16 @@ void run() throws PulsarAdminException {
}
}
}

@Parameters(commandDescription = "get the last commit message id of topic")
private class GetLastMessageId extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(topics.getLastMessageId(persistentTopic));
}
}
}