Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 6043] Support force deleting subscription #6383

Merged
merged 6 commits into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,14 @@ protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse,
});
}

protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative, boolean force) {
if (force) {
internalDeleteSubscriptionForcefully(asyncResponse, subName, authoritative);
} else {
internalDeleteSubscription(asyncResponse, subName, authoritative);
}
}

protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative) {
if (topicName.isGlobal()) {
try {
Expand All @@ -1067,7 +1075,7 @@ protected void internalDeleteSubscription(AsyncResponse asyncResponse, String su
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.deleteSubscriptionAsync(topicNamePartition.toString(), subName));
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
} catch (Exception e) {
log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicNamePartition, subName,
e);
Expand Down Expand Up @@ -1133,6 +1141,87 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn
}
}

protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) {
if (topicName.isGlobal()) {
try {
validateGlobalNamespaceOwnership(namespaceName);
} catch (Exception e) {
log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
}
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative);
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();

for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, true));
} catch (Exception e) {
log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicNamePartition, subName,
e);
asyncResponse.resume(new RestException(e));
return;
}
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return null;
} else {
log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicName, subName, t);
asyncResponse.resume(new RestException(t));
return null;
}
}

asyncResponse.resume(Response.noContent().build());
return null;
});
} else {
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative);
}
}).exceptionally(ex -> {
log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}

private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) {
try {
validateAdminAccessForSubscriber(subName, authoritative);
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return;
}
sub.deleteForcefully().get();
log.info("[{}][{}] Deleted subscription forcefully {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
} catch (Exception e) {
log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, e);
if (e instanceof WebApplicationException) {
asyncResponse.resume(e);
} else {
log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicName, subName, e);
asyncResponse.resume(new RestException(e));
}
}
}

protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) {
if (topicName.isGlobal()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ public void getPartitionedStatsInternal(

@DELETE
@Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}")
@ApiOperation(hidden = true, value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.")
@ApiOperation(hidden = true, value = "Delete a subscription.", notes = "The subscription cannot be deleted if delete is not forcefully and there are any active consumers attached to it. "
+ "Force delete ignores connected consumers and deletes subscription by explicitly closing them.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
Expand All @@ -376,10 +377,11 @@ public void getPartitionedStatsInternal(
public void deleteSubscription(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
@QueryParam("force") @DefaultValue("false") boolean force,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative);
internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative, force);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,8 @@ public void getPartitionedStatsInternal(

@DELETE
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}")
@ApiOperation(value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.")
@ApiOperation(value = "Delete a subscription.", notes = "The subscription cannot be deleted if delete is not forcefully and there are any active consumers attached to it. "
+ "Force delete ignores connected consumers and deletes subscription by explicitly closing them.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
Expand All @@ -622,11 +623,13 @@ public void deleteSubscription(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription to be deleted")
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Disconnect and close all consumers and delete subscription forcefully", defaultValue = "false", type = "boolean")
@QueryParam("force") @DefaultValue("false") boolean force,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative);
internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative, force);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ default long getNumberOfEntriesDelayed() {

CompletableFuture<Void> delete();

CompletableFuture<Void> deleteForcefully();

CompletableFuture<Void> disconnect();

CompletableFuture<Void> doUnsubscribe(Consumer consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,53 @@ public synchronized CompletableFuture<Void> disconnect() {
*/
@Override
public CompletableFuture<Void> delete() {
return delete(false);
}

/**
* Forcefully close all consumers and deletes the subscription.
* @return
*/
@Override
public CompletableFuture<Void> deleteForcefully() {
return delete(true);
}

/**
* Delete the subscription by closing and deleting its managed cursor. Handle unsubscribe call from admin layer.
*
* @param closeIfConsumersConnected
* Flag indicate whether explicitly close connected consumers before trying to delete subscription. If
* any consumer is connected to it and if this flag is disable then this operation fails.
* @return CompletableFuture indicating the completion of delete operation
*/
private CompletableFuture<Void> delete(boolean closeIfConsumersConnected) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();

log.info("[{}][{}] Unsubscribing", topicName, subName);

CompletableFuture<Void> closeSubscriptionFuture = new CompletableFuture<>();

if (closeIfConsumersConnected) {
this.disconnect().thenRun(() -> {
closeSubscriptionFuture.complete(null);
}).exceptionally(ex -> {
log.error("[{}][{}] Error disconnecting and closing subscription", topicName, subName, ex);
closeSubscriptionFuture.completeExceptionally(ex);
return null;
});
} else {
this.close().thenRun(() -> {
closeSubscriptionFuture.complete(null);
}).exceptionally(exception -> {
log.error("[{}][{}] Error closing subscription", topicName, subName, exception);
closeSubscriptionFuture.completeExceptionally(exception);
return null;
});
}

// cursor close handles pending delete (ack) operations
this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> {
closeSubscriptionFuture.thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> {
synchronized (this) {
(dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> {
log.info("[{}][{}] Successfully deleted subscription", topicName, subName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,12 +850,53 @@ public synchronized CompletableFuture<Void> disconnect() {
*/
@Override
public CompletableFuture<Void> delete() {
return delete(false);
}

/**
* Forcefully close all consumers and deletes the subscription.
* @return
*/
@Override
public CompletableFuture<Void> deleteForcefully() {
return delete(true);
}

/**
* Delete the subscription by closing and deleting its managed cursor. Handle unsubscribe call from admin layer.
*
* @param closeIfConsumersConnected
* Flag indicate whether explicitly close connected consumers before trying to delete subscription. If
* any consumer is connected to it and if this flag is disable then this operation fails.
* @return CompletableFuture indicating the completion of delete operation
*/
private CompletableFuture<Void> delete(boolean closeIfConsumersConnected) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();

log.info("[{}][{}] Unsubscribing", topicName, subName);

CompletableFuture<Void> closeSubscriptionFuture = new CompletableFuture<>();

if (closeIfConsumersConnected) {
this.disconnect().thenRun(() -> {
closeSubscriptionFuture.complete(null);
}).exceptionally(ex -> {
log.error("[{}][{}] Error disconnecting and closing subscription", topicName, subName, ex);
closeSubscriptionFuture.completeExceptionally(ex);
return null;
});
} else {
this.close().thenRun(() -> {
closeSubscriptionFuture.complete(null);
}).exceptionally(exception -> {
log.error("[{}][{}] Error closing subscription", topicName, subName, exception);
closeSubscriptionFuture.completeExceptionally(exception);
return null;
});
}

// cursor close handles pending delete (ack) operations
this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> {
closeSubscriptionFuture.thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> {
synchronized (this) {
(dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> {
log.info("[{}][{}] Successfully deleted subscription", topicName, subName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -1302,6 +1303,55 @@ public void testNamespaceBundleUnload(Integer numBundles) throws Exception {
admin.topics().delete("persistent://prop-xyz/ns1-bundles/ds2");
}

@Test
public void testDeleteSubscription() throws Exception {
final String subName = "test-sub";
final String persistentTopicName = "persistent://prop-xyz/ns1/test-sub-topic";

// disable auto subscription creation
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);

// create a topic and produce some messages
publishMessagesOnPersistentTopic(persistentTopicName, 5);
assertEquals(admin.topics().getList("prop-xyz/ns1"),
Lists.newArrayList(persistentTopicName));

// create the subscription by PulsarAdmin
admin.topics().createSubscription(persistentTopicName, subName, MessageId.earliest);

assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList(subName));

// create consumer and subscription
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();
Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Exclusive).subscribe();

// try to delete the subscription with a connected consumer
try {
admin.topics().deleteSubscription(persistentTopicName, subName);
fail("should have failed");
} catch (PulsarAdminException.PreconditionFailedException e) {
assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
}

// failed to delete the subscription
assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList(subName));

// try to delete the subscription with a connected consumer forcefully
admin.topics().deleteSubscription(persistentTopicName, subName, true);

// delete the subscription successfully
assertEquals(admin.topics().getSubscriptions(persistentTopicName).size(), 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be a flaky test since the consumer can create a subscription when they reconnect to the broker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, maybe we can disable this line in this pr first and then refine it later when the ability to disable the subscription auto-creation is available?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hava fixed the unit test to cover this.


// reset to default
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);

client.close();
}

@Test(dataProvider = "bundling")
public void testClearBacklogOnNamespace(Integer numBundles) throws Exception {
admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void testGetSubscriptions() {

// 6) Delete the subscription
response = mock(AsyncResponse.class);
persistentTopics.deleteSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true);
persistentTopics.deleteSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", false,true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
Expand Down
Loading