-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support truncate topic #10326
support truncate topic #10326
Conversation
import org.apache.pulsar.common.policies.data.Policies; | ||
import org.apache.pulsar.common.policies.data.TenantInfo; | ||
import org.apache.pulsar.common.policies.data.TopicStats; | ||
import org.apache.pulsar.common.policies.data.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid use start import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I've modified the configuration of IDE, and I'll fix these problems right away
import com.google.common.collect.Range; | ||
import com.google.common.collect.Sets; | ||
import com.google.common.collect.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid use start import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I've modified the configuration of IDE, and I'll fix these problems right away
@@ -3922,4 +3922,66 @@ protected void internalHandleResult(AsyncResponse asyncResponse, | |||
} | |||
} | |||
} | |||
|
|||
protected void internalTruncateTopic(AsyncResponse asyncResponse) { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check the topic ownership first and then get the topic reference. So that we can use the ManagedLedger of the topic to truncate directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Thank you very much. I will modify them in the way of check the topic ownership first and then get the topic reference. At the same time, The implementation is moved from brokerService to Topic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, the truncate operation only can works on the persistent topic, so we'd better to add a check for the topic domain, we should return code 412 when the input topic name is a non-persistent topic.
add non-persistent funcation to throw exception by "nosupport truncate".
update partition topic assert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great feature.
Please address @codelipenghui comments
@ApiOperation(value = "Truncate a topic.", | ||
notes = "NonPersistentTopic is not support truncate.") | ||
@ApiResponses(value = { | ||
@ApiResponse(code = 412, message = "NonPersistentTopic is not support truncate") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NonPersistentTopic does not support truncate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much, I will fix it as soon as possible.
@DELETE | ||
@Path("/{tenant}/{namespace}/{topic}/truncate") | ||
@ApiOperation(value = "Truncate a topic.", | ||
notes = "The latest ledger cannot be deleted ,and only delete acknowledged ledgers.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The latest ledger will not be deleted, and also only delete acknowledged ledgers can be deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much, I will fix it as soon as possible.
.enableBatching(false) | ||
.messageRoutingMode(MessageRoutingMode.SinglePartition) | ||
.create(); | ||
.topic(topicName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please do not reformat
@@ -286,6 +287,19 @@ void run() throws PulsarAdminException { | |||
} | |||
} | |||
|
|||
@Parameters(commandDescription = "Truncate a topic. \n" | |||
+ "\t\tThe topic will be truncate, but the latest ledger cannot be deleted.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"The topic will be truncated, but the latest ledger cannot be deleted"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much, I will fix it as soon as possible.
@@ -484,6 +485,19 @@ void run() throws PulsarAdminException { | |||
} | |||
} | |||
|
|||
@Parameters(commandDescription = "Truncate a topic. \n" | |||
+ "\t\tThe topic will be truncate, but the latest ledger cannot be deleted.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"The topic will be truncated, but the latest ledger cannot be deleted"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much, I will fix it as soon as possible.
@@ -3922,4 +3922,66 @@ protected void internalHandleResult(AsyncResponse asyncResponse, | |||
} | |||
} | |||
} | |||
|
|||
protected void internalTruncateTopic(AsyncResponse asyncResponse) { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! Left some comments.
We could add an entire test for this case, for example, create producer and consumer, produce and consume some messages, then execute the truncate API to check the result.
@@ -18,6 +18,7 @@ | |||
*/ | |||
package org.apache.bookkeeper.mledger; | |||
|
|||
import java.util.concurrent.CompletableFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this import necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I added this dependency in the previous modification, I will delete this dependency.
Topic topic; | ||
try { | ||
validateAdminAccessForTenant(topicName.getTenant()); | ||
validateTopicOwnership(topicName, authoritative); | ||
topic = getTopicReference(topicName); | ||
} catch (Exception e) { | ||
log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e); | ||
resumeAsyncResponseExceptionally(asyncResponse, e); | ||
return; | ||
} | ||
CompletableFuture<Void> future = topic.truncate(); | ||
future.thenAccept(a -> { | ||
asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(), | ||
Response.Status.NO_CONTENT.getReasonPhrase())); | ||
}).exceptionally(e -> { | ||
asyncResponse.resume(e); | ||
return null; | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this block is repeated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your comment. This block is based on (topicName.isPartitioned()==true) and another is based on (meta.partitions == 0). I will merge them Into a function.
Topic topic; | ||
try { | ||
validateAdminAccessForTenant(topicName.getTenant()); | ||
validateTopicOwnership(topicName, authoritative); | ||
topic = getTopicReference(topicName); | ||
} catch (Exception e) { | ||
log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e); | ||
resumeAsyncResponseExceptionally(asyncResponse, e); | ||
return; | ||
} | ||
CompletableFuture<Void> future = topic.truncate(); | ||
future.thenAccept(a -> { | ||
asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(), | ||
Response.Status.NO_CONTENT.getReasonPhrase())); | ||
}).exceptionally(e -> { | ||
asyncResponse.resume(e); | ||
return null; | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this block repeated with above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your comment. This block is based on (topicName.isPartitioned()==true) and another is based on (meta.partitions == 0). I will merge them Into a function.
|
||
/** | ||
* Truncate ledgers | ||
* The latest ledger cannot be deleted ,and only delete acknowledged ledgers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only delete ledgers before the mark delete position, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. The description has been changed to ‘The truncate operation will move all cursors to the end of the topic and delete all inactive ledgers.’
final List<CompletableFuture<Void>> futures = Lists.newArrayList(); | ||
for (ManagedCursor cursor : cursors) { | ||
final CompletableFuture<Void> future = new CompletableFuture<>(); | ||
cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why need to clear backlog?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it doesn't clearly backLog, it will be trim instead of truncate. So I do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clear backlog will lose unacknowledged messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original request is to delete retained message, retained messages are acknowledged messages, messages in backlog are unacknowledged messages.
There are already dedicated commands to clear whole backlog or expire certain messages in backlog, for this command we should just delete as much as retained message(messages already consumed and acknowledge by all consumers) so we shouldn't clear backlog here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for your comments. I think ‘truncate' is more like deleting all the messages than deleting the consumed messages. And I want to add a 'trim' function to the next PR.
@@ -2316,10 +2329,10 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) { | |||
log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name, | |||
ls.getLedgerId()); | |||
break; | |||
} else if (expired) { | |||
} else if (expired || isTruncate) { | |||
log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When expired is false
and isTruncate
is true, it seems that the debug info needs to be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 please update log info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much. I will fix this debug info.
log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp()); | ||
ledgersToDelete.add(ls); | ||
} else if (overRetentionQuota) { | ||
} else if (overRetentionQuota || isTruncate) { | ||
log.debug("[{}] Ledger {} is over quota", name, ls.getLedgerId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When overRetentionQuota is false
and isTruncate is true
, it seems that the debug info needs to be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much. I will fix this debug info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As Enrico suggested, please remove white spaces/ format changes, especially those indention changes. For new code it's fine to use a good format of your own, but for existing code if everyone just keep reformatting existing code in their commit it'll just create too much noise in the pr and make it hard to review.
@@ -2316,10 +2329,10 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) { | |||
log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name, | |||
ls.getLedgerId()); | |||
break; | |||
} else if (expired) { | |||
} else if (expired || isTruncate) { | |||
log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 please update log info.
final List<CompletableFuture<Void>> futures = Lists.newArrayList(); | ||
for (ManagedCursor cursor : cursors) { | ||
final CompletableFuture<Void> future = new CompletableFuture<>(); | ||
cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original request is to delete retained message, retained messages are acknowledged messages, messages in backlog are unacknowledged messages.
There are already dedicated commands to clear whole backlog or expire certain messages in backlog, for this command we should just delete as much as retained message(messages already consumed and acknowledge by all consumers) so we shouldn't clear backlog here.
@@ -2156,6 +2157,14 @@ private void scheduleDeferredTrimming(CompletableFuture<?> promise) { | |||
scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(promise)), 100, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we still using this method? if not can just remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, does this method is ‘scheduledExecutor.schedule()’ or 'trimConsumedLedgersInBackground'? If it's one of them, please tell me how to modify it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed the code. This function is private and unused in the ManageLedgerImpl Class. So It should be removed
CompletableFuture<Void> future = ledger2.asyncTruncate(); | ||
future.get(); | ||
|
||
assertTrue(ledger2.getLedgersInfoAsList().size() <= 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use setMaxEntriesPerLedger to make managedLedger create new ledger so even current ledger won't be deleted, we can see older ledgers get removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MarvinCai We have a min rollover time limitation, the min value we are able to set is 1min. So I think maybe setMaxEntriesPerLedger
can't work here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Fixes #9597
Motivation
Add support for truncate all data of the topic without disconnect the producers and consumers.
Modifications
Add a API to truncate data of the topic which can delete all inactive ledgers even if the data retention is enabled. The truncate operation will move all cursors to the end of the topic and delete all inactive ledgers. The truncate operation will not rollover the current ledger, so the active ledger will not been deleted. So for this cases, if users want to delete all data from the topic and currently no new data writes to the topic, all the data will be deleted after the ledger rollover triggered.
And, in the future we can add a separate command for rollover the ledger manually. Which can decouple with the truncate api. So that from the admin side, we can combine the rollover API and the truncate API to delete both the current active ledger and inactive ledgers.
Verifying this change
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation