Skip to content

Commit

Permalink
[fix][broker] fix can not revoke permission after update topic partit…
Browse files Browse the repository at this point in the history
…ion (apache#17393)

* add unittest for revoke permission of topic after update topic partition

* fix revoke permission of partition

Co-authored-by: fanjianye <fanjianye@bigo.sg>
(cherry picked from commit 8c4aad5)
(cherry picked from commit 2d0190f)
  • Loading branch information
TakaHiR07 authored and nicoloboschi committed Dec 6, 2022
1 parent f29e5bb commit e83ef25
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
Expand Up @@ -326,7 +326,7 @@ protected void internalDeleteTopicForcefully(boolean authoritative) {
}
}

private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String role) {
private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String role, boolean force) {
return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
policiesOptional -> {
Policies policies = policiesOptional.orElseThrow(() ->
Expand All @@ -335,8 +335,12 @@ private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String r
|| !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}",
clientAppId(), role, topicUri);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Permissions are not set at the topic level"));
if (force) {
return CompletableFuture.completedFuture(null);
} else {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Permissions are not set at the topic level"));
}
}
// Write the new policies to metadata store
return namespaceResources().setPoliciesAsync(namespaceName, p -> {
Expand All @@ -362,10 +366,10 @@ protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, Str
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
future = future.thenComposeAsync(unused ->
revokePermissionsAsync(topicNamePartition.toString(), role));
revokePermissionsAsync(topicNamePartition.toString(), role, true));
}
}
return future.thenComposeAsync(unused -> revokePermissionsAsync(topicName.toString(), role))
return future.thenComposeAsync(unused -> revokePermissionsAsync(topicName.toString(), role, false))
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
}))
).exceptionally(ex -> {
Expand Down
Expand Up @@ -209,6 +209,8 @@ public void testProxyAuthorization() throws Exception {
* 2. Update the topic partition number to 4.
* 3. Use new producer/consumer with client role to process the topic.
* 4. Broker should authorize producer/consumer normally.
* 5. revoke produce/consumer permission of topic
* 6. new producer/consumer should not be authorized
* </pre>
*/
@Test
Expand Down Expand Up @@ -290,6 +292,26 @@ public void testUpdatePartitionNumAndReconnect() throws Exception {
Assert.assertEquals(messageSet, receivedMessageSet);
consumer.close();
producer.close();

// revoke produce/consume permission
admin.topics().revokePermissions(topicName, CLIENT_ROLE);

// produce/consume the topic should fail
try {
consumer = proxyClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName).subscribe();
Assert.fail("Should not pass");
} catch (PulsarClientException.AuthorizationException ex) {
// ok
}
try {
producer = proxyClient.newProducer(Schema.BYTES)
.topic(topicName).create();
Assert.fail("Should not pass");
} catch (PulsarClientException.AuthorizationException ex) {
// ok
}
log.info("-- Exiting {} test --", methodName);
}

Expand Down

0 comments on commit e83ef25

Please sign in to comment.