Skip to content

Commit

Permalink
Fix PR #7818 #7802 does not support admin cli (#7940)
Browse files Browse the repository at this point in the history
### Motivation
PR #7818 #7802 supports topic-level policies.
But the pulsar admin cli java doc is not supported accordingly.
  • Loading branch information
315157973 authored Sep 3, 2020
1 parent 7e4ace2 commit 8377396
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,20 @@ public void topics() throws Exception {
cmdTopics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3"));
verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3);

cmdTopics.run(split("get-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("remove-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1 -m 999"));
verify(mockTopics).setMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", 999);

cmdTopics.run(split("get-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("remove-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1 -m 99"));
verify(mockTopics).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99);

// argument matcher for the timestamp in reset cursor. Since we can't verify exact timestamp, we check for a
// range of +/- 1 second of the expected timestamp
class TimestampMatcher implements ArgumentMatcher<Long> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ public CmdTopics(PulsarAdmin admin) {
jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold());
jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold());
jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold());
jcommander.addCommand("get-max-unacked-messages-on-consumer", new GetMaxUnackedMessagesOnConsumer());
jcommander.addCommand("set-max-unacked-messages-on-consumer", new SetMaxUnackedMessagesOnConsumer());
jcommander.addCommand("remove-max-unacked-messages-on-consumer", new RemoveMaxUnackedMessagesOnConsumer());
jcommander.addCommand("get-max-unacked-messages-on-subscription", new GetMaxUnackedMessagesOnSubscription());
jcommander.addCommand("set-max-unacked-messages-on-subscription", new SetMaxUnackedMessagesOnSubscription());
jcommander.addCommand("remove-max-unacked-messages-on-subscription", new RemoveMaxUnackedMessagesOnSubscription());
jcommander.addCommand("get-publish-rate", new GetPublishRate());
jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
Expand Down Expand Up @@ -1170,6 +1176,84 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Get max unacked messages policy on consumer for a topic")
private class GetMaxUnackedMessagesOnConsumer extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(admin.topics().getMaxUnackedMessagesOnConsumer(persistentTopic));
}
}

@Parameters(commandDescription = "Remove max unacked messages policy on consumer for a topic")
private class RemoveMaxUnackedMessagesOnConsumer extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
admin.topics().removeMaxUnackedMessagesOnConsumer(persistentTopic);
}
}

@Parameters(commandDescription = "Set max unacked messages policy on consumer for a topic")
private class SetMaxUnackedMessagesOnConsumer extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on consumer", required = true)
private int maxNum;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
admin.topics().setMaxUnackedMessagesOnConsumer(persistentTopic, maxNum);
}
}

@Parameters(commandDescription = "Get max unacked messages policy on subscription for a topic")
private class GetMaxUnackedMessagesOnSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(admin.topics().getMaxUnackedMessagesOnSubscription(persistentTopic));
}
}

@Parameters(commandDescription = "Remove max unacked messages policy on subscription for a topic")
private class RemoveMaxUnackedMessagesOnSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
admin.topics().removeMaxUnackedMessagesOnSubscription(persistentTopic);
}
}

@Parameters(commandDescription = "Set max unacked messages policy on subscription for a topic")
private class SetMaxUnackedMessagesOnSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on subscription", required = true)
private int maxNum;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
admin.topics().setMaxUnackedMessagesOnSubscription(persistentTopic, maxNum);
}
}

@Parameters(commandDescription = "Get compaction threshold for a topic")
private class GetCompactionThreshold extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
Expand Down

0 comments on commit 8377396

Please sign in to comment.