Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support get topic applied policy for DeduplicationStatus #9339

Merged
merged 9 commits into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ protected void internalRemoveAutoSubscriptionCreation(AsyncResponse asyncRespons
internalSetAutoSubscriptionCreation(asyncResponse, null);
}

protected void internalModifyDeduplication(boolean enableDeduplication) {
protected void internalModifyDeduplication(Boolean enableDeduplication) {
validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
updatePolicies(path(POLICIES, namespaceName.toString()), policies ->{
Expand Down Expand Up @@ -1999,6 +1999,11 @@ protected void internalSetMaxProducersPerTopic(Integer maxProducersPerTopic) {
}
}

protected Boolean internalGetDeduplication() {
validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).deduplicationEnabled;
}

protected Integer internalGetMaxConsumersPerTopic() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_consumers_per_topic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2561,7 +2561,20 @@ protected void internalSetBacklogQuota(AsyncResponse asyncResponse,
});
}

protected CompletableFuture<Void> internalSetDeduplicationEnabled(Boolean enabled) {
protected CompletableFuture<Boolean> internalGetDeduplication(boolean applied) {
Boolean deduplicationEnabled = getTopicPolicies(topicName)
.map(TopicPolicies::getDeduplicationEnabled)
.orElseGet(() -> {
if (applied) {
Boolean enabled = getNamespacePolicies(namespaceName).deduplicationEnabled;
return enabled == null ? config().isBrokerDeduplicationEnabled() : enabled;
}
return null;
});
return CompletableFuture.completedFuture(deduplicationEnabled);
}

protected CompletableFuture<Void> internalSetDeduplication(Boolean enabled) {
TopicPolicies topicPolicies = null;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,16 @@ public void setSubscriptionExpirationTime(@PathParam("tenant") String tenant,
internalSetSubscriptionExpirationTime(expirationTime);
}

@GET
@Path("/{tenant}/{namespace}/deduplication")
315157973 marked this conversation as resolved.
Show resolved Hide resolved
@ApiOperation(value = "Get broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public Boolean getDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetDeduplication();
}

@POST
@Path("/{tenant}/{namespace}/deduplication")
@ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace")
Expand All @@ -349,6 +359,16 @@ public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam("
internalModifyDeduplication(enableDeduplication);
}

@DELETE
@Path("/{tenant}/{namespace}/deduplication")
@ApiOperation(value = "Remove broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public void removeDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalModifyDeduplication(null);
}

@POST
@Path("/{tenant}/{namespace}/autoTopicCreation")
@ApiOperation(value = "Override broker's allowAutoTopicCreation setting for a namespace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1657,17 +1657,23 @@ public void removeMessageTTL(@Suspended final AsyncResponse asyncResponse,
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the topic level policy and retry")})
public void getDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse,
public void getDeduplication(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied) {
validateTopicName(tenant, namespace, encodedTopic);
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
if (topicPolicies.isDeduplicationSet()) {
asyncResponse.resume(topicPolicies.getDeduplicationEnabled());
} else {
asyncResponse.resume(Response.noContent().build());
}
internalGetDeduplication(applied).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed get Deduplication", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed get Deduplication", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(res);
}
});
}

@POST
Expand All @@ -1677,15 +1683,15 @@ public void getDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the topic level policy and retry")})
public void setDeduplicationEnabled(
public void setDeduplication(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "DeduplicationEnabled policies for the specified topic")
Boolean enabled) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetDeduplicationEnabled(enabled).whenComplete((r, ex) -> {
internalSetDeduplication(enabled).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated deduplication", ex);
asyncResponse.resume(ex);
Expand All @@ -1706,12 +1712,12 @@ public void setDeduplicationEnabled(
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse,
public void removeDeduplication(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
setDeduplicationEnabled(asyncResponse, tenant, namespace, encodedTopic, null);
setDeduplication(asyncResponse, tenant, namespace, encodedTopic, null);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2738,6 +2738,8 @@ public void onUpdate(TopicPolicies policies) {
replicators.forEach((name, replicator) -> replicator.getRateLimiter()
.ifPresent(DispatchRateLimiter::updateDispatchRate));
updateUnackedMessagesExceededOnConsumer(null);

checkDeduplicationStatus();
}

private Optional<Policies> getNamespacePolicies() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.testng.annotations.Test;
import org.awaitility.Awaitility;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
Expand Down Expand Up @@ -86,6 +88,121 @@ public void testDuplicationApi() throws Exception {
assertNull(admin.topics().getDeduplicationEnabled(topicName));
}

@Test(timeOut = 10000)
public void testTopicDuplicationApi2() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
admin.topics().createPartitionedTopic(topicName, 3);
waitCacheInit(topicName);
Boolean enabled = admin.topics().getDeduplicationStatus(topicName);
assertNull(enabled);

admin.topics().setDeduplicationStatus(topicName, true);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> admin.topics().getDeduplicationStatus(topicName) != null);
Assert.assertEquals(admin.topics().getDeduplicationStatus(topicName), true);

admin.topics().removeDeduplicationStatus(topicName);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> admin.topics().getMaxUnackedMessagesOnSubscription(topicName) == null);
assertNull(admin.topics().getDeduplicationStatus(topicName));
}

@Test(timeOut = 10000)
public void testTopicDuplicationAppliedApi() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
waitCacheInit(topicName);
assertNull(admin.namespaces().getDeduplicationStatus(myNamespace));
assertNull(admin.topics().getDeduplicationStatus(topicName));
assertEquals(admin.topics().getDeduplicationStatus(topicName, true).booleanValue(),
conf.isBrokerDeduplicationEnabled());

admin.namespaces().setDeduplicationStatus(myNamespace, false);
Awaitility.await().untilAsserted(() -> assertFalse(admin.topics().getDeduplicationStatus(topicName, true)));
admin.topics().setDeduplicationStatus(topicName, true);
Awaitility.await().untilAsserted(() -> assertTrue(admin.topics().getDeduplicationStatus(topicName, true)));

admin.topics().removeDeduplicationStatus(topicName);
Awaitility.await().untilAsserted(() -> assertFalse(admin.topics().getDeduplicationStatus(topicName, true)));
admin.namespaces().removeDeduplicationStatus(myNamespace);
Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getDeduplicationStatus(topicName, true).booleanValue(),
conf.isBrokerDeduplicationEnabled()));
}

@Test(timeOut = 30000)
public void testDeduplicationPriority() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
final String producerName = "my-producer";
final int maxMsgNum = 5;
waitCacheInit(topicName);
//1) Start up producer and send msg.We specified the max sequenceId
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName)
.producerName(producerName).create();
long maxSeq = sendMessageAndGetMaxSeq(maxMsgNum, producer);
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
MessageDeduplication messageDeduplication = persistentTopic.getMessageDeduplication();
//broker-level deduplication is enabled in setup() by default
checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq);
//disabled in namespace-level
admin.namespaces().setDeduplicationStatus(myNamespace, false);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getDeduplicationStatus(myNamespace)));
sendMessageAndGetMaxSeq(maxMsgNum, producer);
checkDeduplicationDisabled(producerName, messageDeduplication);
//enabled in topic-level
admin.topics().setDeduplicationStatus(topicName, true);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getDeduplicationStatus(topicName)));
Awaitility.await().untilAsserted(() -> assertTrue(messageDeduplication.isEnabled()));
long maxSeq2 = sendMessageAndGetMaxSeq(maxMsgNum, producer);
checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq2);
//remove topic-level, use namespace-level
admin.topics().removeDeduplicationStatus(topicName);
Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getDeduplicationStatus(topicName)));
Awaitility.await().untilAsserted(() -> assertFalse(messageDeduplication.isEnabled()));
producer.newMessage().value("msg").sequenceId(1).send();
checkDeduplicationDisabled(producerName, messageDeduplication);
//remove namespace-level , use broker-level
admin.namespaces().removeDeduplicationStatus(myNamespace);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getDeduplicationStatus(myNamespace)));
Awaitility.await().untilAsserted(() -> assertTrue(messageDeduplication.isEnabled()));
long maxSeq3 = sendMessageAndGetMaxSeq(maxMsgNum, producer);
checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq3);
}

private long sendMessageAndGetMaxSeq(int maxMsgNum, Producer producer) throws Exception{
long seq = System.nanoTime();
for (int i = 0; i <= maxMsgNum; i++) {
producer.newMessage().value("msg-" + i).sequenceId(seq + i).send();
}
return seq + maxMsgNum;
}

private void checkDeduplicationDisabled(String producerName, MessageDeduplication messageDeduplication) throws Exception {
messageDeduplication.checkStatus().whenComplete((res, ex) -> {
if (ex != null) {
fail("should not fail");
}
assertEquals(messageDeduplication.getLastPublishedSequenceId(producerName), -1);
assertEquals(messageDeduplication.highestSequencedPersisted.size(), 0);
assertEquals(messageDeduplication.highestSequencedPushed.size(), 0);
}).get();
}

private void checkDeduplicationEnabled(String producerName, MessageDeduplication messageDeduplication,
long maxSeq) throws Exception {
messageDeduplication.checkStatus().whenComplete((res, ex) -> {
if (ex != null) {
fail("should not fail");
}
assertNotNull(messageDeduplication.highestSequencedPersisted);
assertNotNull(messageDeduplication.highestSequencedPushed);
long seqId = messageDeduplication.getLastPublishedSequenceId(producerName);
assertEquals(seqId, maxSeq);
assertEquals(messageDeduplication.highestSequencedPersisted.get(producerName).longValue(), maxSeq);
assertEquals(messageDeduplication.highestSequencedPushed.get(producerName).longValue(), maxSeq);
}).get();
}

@Test(timeOut = 10000)
public void testDuplicationSnapshotApi() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
Expand Down Expand Up @@ -192,27 +309,13 @@ public void testDuplicationMethod() throws Exception {
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName)
.producerName(producerName).create();
long seq = System.currentTimeMillis();
for (int i = 0; i <= maxMsgNum; i++) {
producer.newMessage().value("msg-" + i).sequenceId(seq + i).send();
}
long maxSeq = seq + maxMsgNum;
long maxSeq = sendMessageAndGetMaxSeq(maxMsgNum, producer);
//2) Max sequenceId should be recorded correctly
CompletableFuture<Optional<Topic>> completableFuture = pulsar.getBrokerService().getTopics().get(topicName);
Topic topic = completableFuture.get(1, TimeUnit.SECONDS).get();
PersistentTopic persistentTopic = (PersistentTopic) topic;
MessageDeduplication messageDeduplication = persistentTopic.getMessageDeduplication();
messageDeduplication.checkStatus().whenComplete((res, ex) -> {
if (ex != null) {
fail("should not fail");
}
assertNotNull(messageDeduplication.highestSequencedPersisted);
assertNotNull(messageDeduplication.highestSequencedPushed);
long seqId = messageDeduplication.getLastPublishedSequenceId(producerName);
assertEquals(seqId, maxSeq);
assertEquals(messageDeduplication.highestSequencedPersisted.get(producerName).longValue(), maxSeq);
assertEquals(messageDeduplication.highestSequencedPushed.get(producerName).longValue(), maxSeq);
}).get();
checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq);
//3) disable the deduplication check
admin.topics().enableDeduplication(topicName, false);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
Expand All @@ -221,14 +324,7 @@ public void testDuplicationMethod() throws Exception {
producer.newMessage().value("msg-" + i).sequenceId(maxSeq + i).send();
}
//4) Max sequenceId record should be clear
messageDeduplication.checkStatus().whenComplete((res, ex) -> {
if (ex != null) {
fail("should not fail");
}
assertEquals(messageDeduplication.getLastPublishedSequenceId(producerName), -1);
assertEquals(messageDeduplication.highestSequencedPersisted.size(), 0);
assertEquals(messageDeduplication.highestSequencedPushed.size(), 0);
}).get();
checkDeduplicationDisabled(producerName, messageDeduplication);

}

Expand Down Expand Up @@ -315,7 +411,7 @@ public void testNamespacePolicyApi() throws Exception {
}

@Test(timeOut = 30000)
private void testNamespacePolicyTakeSnapshot() throws Exception {
public void testNamespacePolicyTakeSnapshot() throws Exception {
resetConfig();
conf.setBrokerDeduplicationEnabled(true);
conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
Expand Down Expand Up @@ -367,7 +463,7 @@ private void testNamespacePolicyTakeSnapshot() throws Exception {
}

@Test(timeOut = 30000)
private void testDisableNamespacePolicyTakeSnapshot() throws Exception {
public void testDisableNamespacePolicyTakeSnapshot() throws Exception {
resetConfig();
conf.setBrokerDeduplicationEnabled(true);
conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
Expand Down