Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support get topic applied policy for Retention #9362

Merged
merged 9 commits into from Feb 7, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -2538,6 +2538,9 @@ private void validatePolicies(NamespaceName ns, Policies policies) {
}

protected void validateRetentionPolicies(RetentionPolicies retention) {
if (retention == null) {
return;
}
checkArgument(retention.getRetentionSizeInMB() >= -1,
"Invalid retention policy: size limit must be >= -1");
checkArgument(retention.getRetentionTimeInMinutes() >= -1,
Expand Down
Expand Up @@ -2636,15 +2636,19 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
}

protected void internalGetRetention(AsyncResponse asyncResponse){
protected void internalGetRetention(AsyncResponse asyncResponse, boolean applied){
preValidation();
Optional<RetentionPolicies> retention = getTopicPolicies(topicName)
.map(TopicPolicies::getRetentionPolicies);
if (!retention.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(retention.get());
}
RetentionPolicies retentionPolicies = getTopicPolicies(topicName)
.map(TopicPolicies::getRetentionPolicies).orElseGet(() -> {
if (applied) {
RetentionPolicies policies = getNamespacePolicies(namespaceName).retention_policies;
return policies == null ? new RetentionPolicies(
config().getDefaultRetentionTimeInMinutes(), config().getDefaultRetentionSizeInMB())
: policies;
}
return null;
});
asyncResponse.resume(retentionPolicies == null ? Response.noContent().build() : retentionPolicies);
}

protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retention) {
Expand Down
Expand Up @@ -686,6 +686,19 @@ public void setRetention(@PathParam("tenant") String tenant, @PathParam("namespa
internalSetRetention(retention);
}

@DELETE
@Path("/{tenant}/{namespace}/retention")
@ApiOperation(value = " Remove retention configuration on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota") })
public void removeRetention(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@ApiParam(value = "Retention policies for the specified namespace") RetentionPolicies retention) {
validateNamespaceName(tenant, namespace);
internalSetRetention(null);
}

@POST
@Path("/{tenant}/{namespace}/persistence")
@ApiOperation(value = "Set the persistence configuration for all the topics on a namespace.")
Expand Down
Expand Up @@ -1669,10 +1669,11 @@ public void removeDeduplicationEnabled(@Suspended final AsyncResponse asyncRespo
public void getRetention(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied) {
validateTopicName(tenant, namespace, encodedTopic);
try {
internalGetRetention(asyncResponse);
internalGetRetention(asyncResponse, applied);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
Expand Down
Expand Up @@ -2025,7 +2025,7 @@ private boolean shouldTopicBeRetained() {
long retentionTime = TimeUnit.MINUTES.toNanos(retentionPolicies.getRetentionTimeInMinutes());
// Negative retention time means the topic should be retained indefinitely,
// because its own data has to be retained
return retentionTime < 0 || (System.nanoTime() - lastActive) < retentionTime;
return retentionTime <= 0 || (System.nanoTime() - lastActive) < retentionTime;
315157973 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Expand Up @@ -52,12 +52,14 @@
import org.testng.annotations.Test;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -282,6 +284,90 @@ public void testRemoveRetention() throws Exception {
admin.topics().deletePartitionedTopic(testTopic, true);
}

@Test(timeOut = 10000)
public void testRetentionAppliedApi() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().atMost(3, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
RetentionPolicies brokerPolicies =
new RetentionPolicies(conf.getDefaultRetentionTimeInMinutes(), conf.getDefaultRetentionSizeInMB());
assertEquals(admin.topics().getRetention(topic, true), brokerPolicies);

RetentionPolicies namespacePolicies = new RetentionPolicies(10, 20);
admin.namespaces().setRetention(myNamespace, namespacePolicies);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topics().getRetention(topic, true), namespacePolicies));

RetentionPolicies topicPolicies = new RetentionPolicies(20,30);
admin.topics().setRetention(topic, topicPolicies);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topics().getRetention(topic, true), topicPolicies));

admin.topics().removeRetention(topic);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topics().getRetention(topic, true), namespacePolicies));

admin.namespaces().removeRetention(myNamespace);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topics().getRetention(topic, true), brokerPolicies));
}

@Test(timeOut = 20000)
public void testRetentionPriority() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().atMost(3, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
assertNull(admin.topics().getRetention(topic));
assertNull(admin.namespaces().getRetention(myNamespace));

PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Method shouldTopicBeRetained = PersistentTopic.class.getDeclaredMethod("shouldTopicBeRetained");
shouldTopicBeRetained.setAccessible(true);
Field lastActive = PersistentTopic.class.getSuperclass().getDeclaredField("lastActive");
lastActive.setAccessible(true);
//set last active to 2 minutes ago
lastActive.setLong(persistentTopic, System.nanoTime() - TimeUnit.MINUTES.toNanos(2));
//set namespace-level policy
RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1);
admin.namespaces().setRetention(myNamespace, retentionPolicies);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertNotNull(admin.namespaces().getRetention(myNamespace)));
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
// set topic-level policy
admin.topics().setRetention(topic, new RetentionPolicies(3, 1));
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertNotNull(admin.topics().getRetention(topic)));
assertTrue((boolean) shouldTopicBeRetained.invoke(persistentTopic));
//topic-level disabled
admin.topics().setRetention(topic, new RetentionPolicies(0, 0));
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertEquals(admin.topics().getRetention(topic).getRetentionSizeInMB(), 0));
assertTrue((boolean) shouldTopicBeRetained.invoke(persistentTopic));
// remove topic-level policy
admin.topics().removeRetention(topic);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertNull(admin.topics().getRetention(topic)));
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
//namespace-level disabled
admin.namespaces().setRetention(myNamespace, new RetentionPolicies(0, 0));
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertNotNull(admin.namespaces().getRetention(myNamespace)));
assertTrue((boolean) shouldTopicBeRetained.invoke(persistentTopic));
//change namespace-level policy
admin.namespaces().setRetention(myNamespace, new RetentionPolicies(1, 1));
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertNotNull(admin.namespaces().getRetention(myNamespace)));
assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
// remove namespace-level policy
admin.namespaces().removeRetention(myNamespace);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
-> assertNull(admin.namespaces().getRetention(myNamespace)));
assertTrue((boolean) shouldTopicBeRetained.invoke(persistentTopic));
}

@Test
public void testCheckPersistence() throws Exception {
PersistencePolicies persistencePolicies = new PersistencePolicies(6, 2, 2, 0.0);
Expand Down
Expand Up @@ -1693,6 +1693,20 @@ void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffi
*/
CompletableFuture<Void> setRetentionAsync(String namespace, RetentionPolicies retention);

/**
* Remove the retention configuration for all the topics on a namespace.
* @param namespace
* @throws PulsarAdminException
*/
void removeRetention(String namespace) throws PulsarAdminException;

/**
* Remove the retention configuration for all the topics on a namespace asynchronously.
* @param namespace
* @return
*/
CompletableFuture<Void> removeRetentionAsync(String namespace);

/**
* Get the retention configuration for a namespace.
* <p/>
Expand Down
Expand Up @@ -1739,6 +1739,23 @@ CompletableFuture<Void> setDelayedDeliveryPolicyAsync(String topic
*/
CompletableFuture<RetentionPolicies> getRetentionAsync(String topic);

/**
* Get the applied retention configuration for a topic.
* @param topic
* @param applied
* @return
* @throws PulsarAdminException
*/
RetentionPolicies getRetention(String topic, boolean applied) throws PulsarAdminException;

/**
* Get the applied retention configuration for a topic asynchronously.
* @param topic
* @param applied
* @return
*/
CompletableFuture<RetentionPolicies> getRetentionAsync(String topic, boolean applied);

/**
* Remove the retention configuration for all the topics on a topic.
* <p/>
Expand Down
Expand Up @@ -1236,6 +1236,27 @@ public CompletableFuture<Void> setRetentionAsync(String namespace, RetentionPoli
return asyncPostRequest(path, Entity.entity(retention, MediaType.APPLICATION_JSON));
}

@Override
public void removeRetention(String namespace) throws PulsarAdminException {
try {
removeRetentionAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> removeRetentionAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "retention");
return asyncDeleteRequest(path);
}

@Override
public RetentionPolicies getRetention(String namespace) throws PulsarAdminException {
try {
Expand Down
Expand Up @@ -2075,8 +2075,18 @@ public CompletableFuture<Void> setRetentionAsync(String topic, RetentionPolicies

@Override
public RetentionPolicies getRetention(String topic) throws PulsarAdminException {
return getRetention(topic, false);
}

@Override
public CompletableFuture<RetentionPolicies> getRetentionAsync(String topic) {
return getRetentionAsync(topic, false);
}

@Override
public RetentionPolicies getRetention(String topic, boolean applied) throws PulsarAdminException {
try {
return getRetentionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
return getRetentionAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Expand All @@ -2088,9 +2098,10 @@ public RetentionPolicies getRetention(String topic) throws PulsarAdminException
}

@Override
public CompletableFuture<RetentionPolicies> getRetentionAsync(String topic) {
public CompletableFuture<RetentionPolicies> getRetentionAsync(String topic, boolean applied) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "retention");
path = path.queryParam("applied", applied);
final CompletableFuture<RetentionPolicies> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<RetentionPolicies>() {
Expand Down
Expand Up @@ -419,6 +419,9 @@ public void namespaces() throws Exception {
namespaces.run(split("get-retention myprop/clust/ns1"));
verify(mockNamespaces).getRetention("myprop/clust/ns1");

namespaces.run(split("remove-retention myprop/clust/ns1"));
verify(mockNamespaces).removeRetention("myprop/clust/ns1");

namespaces.run(split("set-delayed-delivery myprop/clust/ns1 -e -t 1s"));
verify(mockNamespaces).setDelayedDeliveryMessages("myprop/clust/ns1", new DelayedDeliveryPolicies(1000, true));

Expand Down Expand Up @@ -847,6 +850,14 @@ public void topics() throws Exception {
cmdTopics.run(split("set-max-message-size persistent://myprop/clust/ns1/ds1 -m 99"));
verify(mockTopics).setMaxMessageSize("persistent://myprop/clust/ns1/ds1", 99);

cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getRetention("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("set-retention persistent://myprop/clust/ns1/ds1 -t 10m -s 20M"));
verify(mockTopics).setRetention("persistent://myprop/clust/ns1/ds1",
new RetentionPolicies(10, 20));
cmdTopics.run(split("remove-retention persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeRetention("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("get-max-producers persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getMaxProducers("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-max-producers persistent://myprop/clust/ns1/ds1"));
Expand Down
Expand Up @@ -549,6 +549,18 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Remove the retention policy for a namespace")
private class RemoveRetention extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
getAdmin().namespaces().removeRetention(namespace);
}
}

@Parameters(commandDescription = "Set the retention policy for a namespace")
private class SetRetention extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
Expand Down Expand Up @@ -1975,6 +1987,7 @@ public CmdNamespaces(Supplier<PulsarAdmin> admin) {

jcommander.addCommand("get-retention", new GetRetention());
jcommander.addCommand("set-retention", new SetRetention());
jcommander.addCommand("remove-retention", new RemoveRetention());

jcommander.addCommand("set-bookie-affinity-group", new SetBookieAffinityGroup());
jcommander.addCommand("get-bookie-affinity-group", new GetBookieAffinityGroup());
Expand Down
Expand Up @@ -1226,10 +1226,13 @@ private class GetRetention extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getRetention(persistentTopic));
print(getAdmin().topics().getRetention(persistentTopic, applied));
}
}

Expand Down