Skip to content

Commit

Permalink
[Issue 6043] Support force deleting subscription (#6383)
Browse files Browse the repository at this point in the history
### Motivation

Fixes #6043

### Modifications

Add method `deleteForcefully` to support force deleting subscription.
  • Loading branch information
murong00 committed Mar 25, 2020
1 parent f7ccdb8 commit 525f9e9
Show file tree
Hide file tree
Showing 14 changed files with 312 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,14 @@ protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse,
});
}

protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative, boolean force) {
if (force) {
internalDeleteSubscriptionForcefully(asyncResponse, subName, authoritative);
} else {
internalDeleteSubscription(asyncResponse, subName, authoritative);
}
}

protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative) {
if (topicName.isGlobal()) {
try {
Expand All @@ -1139,7 +1147,7 @@ protected void internalDeleteSubscription(AsyncResponse asyncResponse, String su
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.deleteSubscriptionAsync(topicNamePartition.toString(), subName));
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
} catch (Exception e) {
log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicNamePartition, subName,
e);
Expand Down Expand Up @@ -1205,6 +1213,87 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn
}
}

protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) {
if (topicName.isGlobal()) {
try {
validateGlobalNamespaceOwnership(namespaceName);
} catch (Exception e) {
log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
}
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative);
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();

for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, true));
} catch (Exception e) {
log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicNamePartition, subName,
e);
asyncResponse.resume(new RestException(e));
return;
}
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return null;
} else {
log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}

asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative);
}
}).exceptionally(ex -> {
log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}

private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) {
try {
validateAdminAccessForSubscriber(subName, authoritative);
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return;
}
sub.deleteForcefully().get();
log.info("[{}][{}] Deleted subscription forcefully {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
} catch (Exception e) {
log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, e);
if (e instanceof WebApplicationException) {
asyncResponse.resume(e);
} else {
log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicName, subName, e);
asyncResponse.resume(new RestException(e));
}
}
}

protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) {
if (topicName.isGlobal()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ public void getPartitionedStatsInternal(

@DELETE
@Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}")
@ApiOperation(hidden = true, value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.")
@ApiOperation(hidden = true, value = "Delete a subscription.", notes = "The subscription cannot be deleted if delete is not forcefully and there are any active consumers attached to it. "
+ "Force delete ignores connected consumers and deletes subscription by explicitly closing them.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
Expand All @@ -376,10 +377,11 @@ public void getPartitionedStatsInternal(
public void deleteSubscription(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
@QueryParam("force") @DefaultValue("false") boolean force,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative);
internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative, force);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,8 @@ public void getPartitionedStatsInternal(

@DELETE
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}")
@ApiOperation(value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.")
@ApiOperation(value = "Delete a subscription.", notes = "The subscription cannot be deleted if delete is not forcefully and there are any active consumers attached to it. "
+ "Force delete ignores connected consumers and deletes subscription by explicitly closing them.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
Expand All @@ -625,11 +626,13 @@ public void deleteSubscription(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription to be deleted")
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Disconnect and close all consumers and delete subscription forcefully", defaultValue = "false", type = "boolean")
@QueryParam("force") @DefaultValue("false") boolean force,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative);
internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative, force);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ default long getNumberOfEntriesDelayed() {

CompletableFuture<Void> delete();

CompletableFuture<Void> deleteForcefully();

CompletableFuture<Void> disconnect();

CompletableFuture<Void> doUnsubscribe(Consumer consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,53 @@ public synchronized CompletableFuture<Void> disconnect() {
*/
@Override
public CompletableFuture<Void> delete() {
return delete(false);
}

/**
* Forcefully close all consumers and deletes the subscription.
* @return
*/
@Override
public CompletableFuture<Void> deleteForcefully() {
return delete(true);
}

/**
* Delete the subscription by closing and deleting its managed cursor. Handle unsubscribe call from admin layer.
*
* @param closeIfConsumersConnected
* Flag indicate whether explicitly close connected consumers before trying to delete subscription. If
* any consumer is connected to it and if this flag is disable then this operation fails.
* @return CompletableFuture indicating the completion of delete operation
*/
private CompletableFuture<Void> delete(boolean closeIfConsumersConnected) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();

log.info("[{}][{}] Unsubscribing", topicName, subName);

CompletableFuture<Void> closeSubscriptionFuture = new CompletableFuture<>();

if (closeIfConsumersConnected) {
this.disconnect().thenRun(() -> {
closeSubscriptionFuture.complete(null);
}).exceptionally(ex -> {
log.error("[{}][{}] Error disconnecting and closing subscription", topicName, subName, ex);
closeSubscriptionFuture.completeExceptionally(ex);
return null;
});
} else {
this.close().thenRun(() -> {
closeSubscriptionFuture.complete(null);
}).exceptionally(exception -> {
log.error("[{}][{}] Error closing subscription", topicName, subName, exception);
closeSubscriptionFuture.completeExceptionally(exception);
return null;
});
}

// cursor close handles pending delete (ack) operations
this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> {
closeSubscriptionFuture.thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> {
synchronized (this) {
(dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> {
log.info("[{}][{}] Successfully deleted subscription", topicName, subName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,12 +850,53 @@ public synchronized CompletableFuture<Void> disconnect() {
*/
@Override
public CompletableFuture<Void> delete() {
return delete(false);
}

/**
* Forcefully close all consumers and deletes the subscription.
* @return
*/
@Override
public CompletableFuture<Void> deleteForcefully() {
return delete(true);
}

/**
* Delete the subscription by closing and deleting its managed cursor. Handle unsubscribe call from admin layer.
*
* @param closeIfConsumersConnected
* Flag indicate whether explicitly close connected consumers before trying to delete subscription. If
* any consumer is connected to it and if this flag is disable then this operation fails.
* @return CompletableFuture indicating the completion of delete operation
*/
private CompletableFuture<Void> delete(boolean closeIfConsumersConnected) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();

log.info("[{}][{}] Unsubscribing", topicName, subName);

CompletableFuture<Void> closeSubscriptionFuture = new CompletableFuture<>();

if (closeIfConsumersConnected) {
this.disconnect().thenRun(() -> {
closeSubscriptionFuture.complete(null);
}).exceptionally(ex -> {
log.error("[{}][{}] Error disconnecting and closing subscription", topicName, subName, ex);
closeSubscriptionFuture.completeExceptionally(ex);
return null;
});
} else {
this.close().thenRun(() -> {
closeSubscriptionFuture.complete(null);
}).exceptionally(exception -> {
log.error("[{}][{}] Error closing subscription", topicName, subName, exception);
closeSubscriptionFuture.completeExceptionally(exception);
return null;
});
}

// cursor close handles pending delete (ack) operations
this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> {
closeSubscriptionFuture.thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> {
synchronized (this) {
(dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> {
log.info("[{}][{}] Successfully deleted subscription", topicName, subName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -1336,6 +1337,55 @@ public void testNamespaceBundleUnload(Integer numBundles) throws Exception {
admin.topics().delete("persistent://prop-xyz/ns1-bundles/ds2");
}

@Test
public void testDeleteSubscription() throws Exception {
final String subName = "test-sub";
final String persistentTopicName = "persistent://prop-xyz/ns1/test-sub-topic";

// disable auto subscription creation
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);

// create a topic and produce some messages
publishMessagesOnPersistentTopic(persistentTopicName, 5);
assertEquals(admin.topics().getList("prop-xyz/ns1"),
Lists.newArrayList(persistentTopicName));

// create the subscription by PulsarAdmin
admin.topics().createSubscription(persistentTopicName, subName, MessageId.earliest);

assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList(subName));

// create consumer and subscription
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();
Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Exclusive).subscribe();

// try to delete the subscription with a connected consumer
try {
admin.topics().deleteSubscription(persistentTopicName, subName);
fail("should have failed");
} catch (PulsarAdminException.PreconditionFailedException e) {
assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
}

// failed to delete the subscription
assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList(subName));

// try to delete the subscription with a connected consumer forcefully
admin.topics().deleteSubscription(persistentTopicName, subName, true);

// delete the subscription successfully
assertEquals(admin.topics().getSubscriptions(persistentTopicName).size(), 0);

// reset to default
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);

client.close();
}

@Test(dataProvider = "bundling")
public void testClearBacklogOnNamespace(Integer numBundles) throws Exception {
admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void testGetSubscriptions() {

// 6) Delete the subscription
response = mock(AsyncResponse.class);
persistentTopics.deleteSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true);
persistentTopics.deleteSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", false,true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
Expand Down
Loading

0 comments on commit 525f9e9

Please sign in to comment.