Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When delete a topic, delete the topic policy together. #11316

Merged
merged 11 commits into from
Jul 20, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,10 @@ public Optional<TopicPolicies> getTopicPolicies() {
return brokerService.getTopicPolicies(TopicName.get(topic));
}

public CompletableFuture<Void> deleteTopicPolicies() {
return brokerService.deleteTopicPolicies(TopicName.get(topic));
}

protected int getWaitingProducersCount() {
return waitingExclusiveProducers.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.CompleteFuture;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -2580,6 +2581,17 @@ public Optional<TopicPolicies> getTopicPolicies(TopicName topicName) {
}
}

public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) {
if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
return new CompletableFuture<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to return a completed future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure return completed future is the best way, so followed the origin code style, return completed fusture. I will modify it.

Copy link
Member Author

@horizonzy horizonzy Jul 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally, I maintain the complete fusture way. I think use complete future is more suitable for this method. Cound you give some advice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@horizonzy I think you need a completed CompletableFuture right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the new CompletableFuture should be completed. Such as CompletableFuture.completedFuture(null);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I wasn't clear.
I was suggesting to return "CompletableFuture.completedFuture(null)"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I wasn't clear.
I was suggesting to return "CompletableFuture.completedFuture(null)"

My fault, I misunderstand it. :)

}
TopicName cloneTopicName = topicName;
if (topicName.isPartitioned()) {
cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method topicName.getPartitionedTopicName() already handled is the topic is a partition or not. so we don't need to check here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
}

private <T> boolean checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions,
CompletableFuture<T> topicFuture) {
Integer maxTopicsPerNamespace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,22 @@ public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
this.pulsarService = pulsarService;
}

@Override
public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
}

@Override
public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) {
CompletableFuture<Void> result = new CompletableFuture<>();
return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies);
}

private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType,
TopicPolicies policies) {
createSystemTopicFactoryIfNeeded();

CompletableFuture<Void> result = new CompletableFuture<>();

SystemTopicClient<PulsarEvent> systemTopicClient =
namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());

Expand All @@ -84,7 +95,7 @@ public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, Top
} else {
writer.writeAsync(
PulsarEvent.builder()
.actionType(ActionType.UPDATE)
.actionType(actionType)
.eventType(EventType.TOPIC_POLICY)
.topicPoliciesEvent(
TopicPoliciesEvent.builder()
Expand Down Expand Up @@ -302,10 +313,27 @@ private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
if (EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent();
policiesCache.put(
TopicName.get(event.getDomain(), event.getTenant(), event.getNamespace(), event.getTopic()),
event.getPolicies()
);
TopicName topicName =
TopicName.get(event.getDomain(), event.getTenant(), event.getNamespace(), event.getTopic());
switch (msg.getValue().getActionType()) {
case INSERT:
TopicPolicies old = policiesCache.putIfAbsent(topicName, event.getPolicies());
if (old != null) {
log.warn("Policy insert failed, the topic: {}' policy already exist", topicName);
}
break;
case UPDATE:
policiesCache.put(topicName, event.getPolicies());
break;
315157973 marked this conversation as resolved.
Show resolved Hide resolved
case DELETE:
policiesCache.remove(topicName);
break;
case NONE:
break;
default:
log.warn("Unknown event action type: {}", msg.getValue().getActionType());
break;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ public interface TopicPoliciesService {

TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled();

/**
* Delete policies for a topic async.
*
* @param topicName topic name
*/
CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName);

/**
* Update policies for a topic async.
*
Expand Down Expand Up @@ -94,6 +101,11 @@ default void clean(TopicName topicName) {

class TopicPoliciesServiceDisabled implements TopicPoliciesService {

@Override
public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
return FutureUtil.failedFuture(new UnsupportedOperationException("Topic policies service is disabled."));
}

@Override
public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) {
return FutureUtil.failedFuture(new UnsupportedOperationException("Topic policies service is disabled."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
if (deleteSchema) {
futures.add(deleteSchema().thenApply(schemaVersion -> null));
}
futures.add(deleteTopicPolicies());
FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
CompletableFuture<SchemaVersion> deleteSchemaFuture =
deleteSchema ? deleteSchema() : CompletableFuture.completedFuture(null);

deleteSchemaFuture.whenComplete((v, ex) -> {
deleteSchemaFuture.thenAccept(__ -> deleteTopicPolicies()).whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
unfenceTopicToResume();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2395,4 +2395,31 @@ public void testTopicRetentionPolicySetInManagedLedgerConfig() throws Exception
});
}

@Test
public void testPolicyIsDeleteTogether() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a test for covering the topic auto-deletion case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();

Awaitility.await()
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
assertNull(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)));

int maxConsumersPerSubscription = 10;
admin.topics().setMaxConsumersPerSubscription(topic, maxConsumersPerSubscription);

Awaitility.await().until(() -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null);
assertNotNull(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be simplified to Awaitility.await().untilAssert()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


Awaitility.await().until(() -> pulsar.getBrokerService().getTopic(topic, false).get().isPresent());
assertTrue(pulsar.getBrokerService().getTopic(topic, false).get().isPresent());

admin.topics().delete(topic);

Awaitility.await().until(() -> !pulsar.getBrokerService().getTopic(topic, false).get().isPresent());
assertFalse(pulsar.getBrokerService().getTopic(topic, false).get().isPresent());

Awaitility.await().until(() -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) == null);
assertNull(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)));
}

}