-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Using Readers still causes backlog quota to be observed #6787
Conversation
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to understand the behavior described in the description - "It is also used to determine up to which ledger we can trim. Thus, with this change, reader's ephemeral subscriptions won't be preventing ledgers from being clean up as well which is what you expect the behavior to be."
Does this result in any data loss or unexpected behaviors when using the reader API?
The reader, per definition, doesn't cause the data to be retained. In this case, in the moment the reader is temporarily disconnected, the data would be deleted immediately, if the retention policy says so. For that, an application using readers needs to configure appropriate retention. The only difference here is when the readers are connected. The current behavior is to consider the reader subject to the backlog quota policy, which is incorrect given that backlog only applies to a subscription. |
@@ -164,6 +175,22 @@ public ManagedCursor getSlowestReader() { | |||
} | |||
} | |||
|
|||
public boolean hasCursors() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's becoming a bit confusing now to understand the meaning of hasCursors()
and isEmpty()
. We should be use a more unambiguous/explicit naming here.
My suggestion:
hasCursors()
-->isEmtpy()
// Since we're talking about a cursors containerisEmpty()
-->hasDurableCursors()
(and fix the reversal of the boolean logic)
// Fallback to read lock | ||
stamp = rwLock.readLock(); | ||
try { | ||
isEmpty = cursors.isEmpty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't the logic be the reverse here? We're checking isEmpty
but the method should return true if it's not empty, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya just used the same logic as "isEmpty" but the name of the method hints at a different semantic meaning.
3cf95c4
to
3603a85
Compare
@jerrypeng @merlimat I think this should be a flag to allow people to choose whether to include readers for backlog quota consideration or not. The behavior of a non-durable cursor reading a managed ledger should be close to the behavior of a file reader reading a file in the filesystem. If a reader is actively reading a file, the file is still accessible even it is deleted. If there is a reader active in the manager ledger, the data shouldn't be deleted if the reader hasn't read it. Otherwise, it is a violation of the correctness of a "distributed log". Thus, this behavior shouldn't be changed by default. If there is a requirement of excluding readers from backlog consideration, we should use a setting to let people opt-in this feature. |
@sijie the default behavior of stored data in pulsar is not exactly the same as data stored in a file. By default data is not persisted in Pulsar at all. Only if a topic has a subscription or retention configured is the data actually persisted. Readers are by definition ephemeral and have no persistent state from a brokers perspective, thus if you want to Readers to able to read a certain amount of data, the user should configure the appropriate retention. Readers are responsible for reading the data and not to determine how much data is retained. I believe those are two separate concepts. I would also argue that if you are using the Reader API and you want some level of guarantee of reading the data, you would have to configure retention. If you don't and the Reader dies, the data is not guaranteed to be there when the Reader comes back. |
There is absolutely no violation of correctness. The contract is clear and it's that a reader can read all the data that is being retained based on max time and size. Above the data retention thresholds, there is no guarantee to be able to read the data. That is very important, because a reader should not under any condition to get the backlog quota filled up. In particular a reader should not have impact over a producer. If we allow a reader to stay connected and have the data retained, then when would that be the limit? and what would be the action after the limit? The current behavior is an accidental one, that in any case it's not something one can rely upon. (eg: a brief disconnection of the reader will cause the data to be deleted anyway). |
This only covers 50% of the contract. If a reader is scanning a log, it should receive events in order and there are no data gaps. @jerrypeng @merlimat I am not arguing about data retention. Please get my point correctly. My argument is about the "expected" behavior when a reader attached to a "distributed log". The reader should be able to read the messages from this cursor without missing any data. This is the behavior you can get from a storage system like any local or distributed file system. If we are saying Pulsar is an event/stream storage system, that is the correct behavior we should provide.
If you look into any file system, when you are opening a file to read and there is a background process delete the file for whatever reason, the reader can still read the file until it is closed. The file system only reclaims disk spaces when the last active open file descriptor is closed. The problem of this change here is not about data retention. The problem of this change is that it introduces uncertainty in Reader API where people can not trust. The Reader API is effectively a storage API that people rely on building stateful applications. We should take this seriously and follow a common semantic that most of the storage systems provide. As I said, if you want to relax this contract, it should be done via a flag. We should allow users to decide which behavior they want to choose. If I am building stateful applications that use Pulsar as the source of truth, I don't want to see those uncertainties. |
That's not what the contract is. And it's not how the reader was designed. Take a look at #355
That is to say: "The reader should receive events in order and there are no data gaps, while operating within the configured constraints". Again, I want to be absolutely clear that this change is not relaxing any guarantee over current behavior. We should not be mixing the life-cycle of 2 different components:
Right now, the data is only retained within the scope of the Since the fact that "NonDurableSubscription" retaining data goes directly against the stated goal, and it doesn't provide any guarantees, it should be regarded as a bug, not as an optional feature.
A file system is also operating within system constraints. When the disk (or user quota) is full, some actions will have to be taken. |
Correct. But the semantic is still maintained. The file is not deleted until the file description is closed. If a reader is active on a stream, the data after the reader position shouldn't be deleted. I think this is the minimum requirement that a reader should provide. Again, I am fine with introducing this change. But a flag should be added to enable this and the default behavior is that the data shouldn't be removed if there is a reader is active. |
As I explained above, this is not how the reader works right now. And there is no way to "guarantee" that behavior right now. I don't think a flag would change that. Supporting by default a "half" guarantee doesn't give anything that an application could rely upon.
The definition of active is tricky. Since by definition the reader is tied to the TCP connection. To be clear, the behavior of the Pulsar reader was modeled over the Kafka consumer where the same scenario happens: data is deleted irrespectively of whether consumers are still consuming or not. If we want to change the implementation of the Pulsar topic reader to support a different semantic that today (eg: guaranteed data for the reader), that would be a longer discussion and it would involve a re-design of the reader that would have to be a "durable" resource instead of an ephemeral one. In my view, at that point we'd be essentially looking at an exclusive subscription with very tiny semantic differences. |
@jerrypeng I am not convinced about this.
So as I said, we need to provide a flag to control the behavior. Only that we can be able to strengthen the reader semantic in the future. Dropping an existing behavior directly doesn't seem to be the right approach for now. Step back - what is the real concern preventing you adding a flag? |
Giving the false impression that data is retained, when in fact is not. Hoping that a reconnection doesn't happen is not a guarantee. As I already said, the Reader concept was designed in this way. If we want to change or augment the semantics and the guarantees, a flag per-se doesn't accomplish anything.
Again, as I said, there are different ways to guarantee the data retention. Time based is one, a subscription is another. What your describing is essentially a reader that behaves like a subscription.. Then why not just use a subscription if you want to make sure to read the data no matter what? |
No. What I am describing is not a subscription. When people are using a reader, it is essentially saying: "I don't want pulsar to manage cursors. I will be responsible for managing cursors.". In this context, it is a reader, not a subscription. What I am trying to ensure is if there is a reader actively reading events, we should be able to make sure the continuity of the events it read.
Step back. I am not going to argue about what should be the correct behavior. Because both of us have our considerations behind it. What I have suggested of adding a flag is a common process of changing existing behavior. The current behavior of a reader is that readers are included in the backlog quota. This is the behavior that we have delivered to the users. If we want to change the behavior, a flag is better to be added to ensure consistent behavior as before. It is a quite common practice to apply for maintaining open source projects. |
@sijie I have added some documentation. Can you please check to see if it is appropriate? If it is can we push this PR through as discussed? |
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
2bab931
to
360044f
Compare
### Motivation When using Readers API, backlog quotas are still enforced on these ephemeral readers. ### Modifications If a cursor is non-durable then we don't add it to the min-heap we are using keep track of the slowest cursor. We use in the min-heap to access the slowest cursor for backlog and quota calculations. It is also used to determine up to which ledger we can trim. Thus, with this change, reader's ephemeral subscriptions won't be preventing ledgers from being clean up as well which is what you expect the behavior to be. There are some caveats to the above. Since triggering of trimming of ledgers is piggy packed on top of ledger operations, if there is no new data coming in ledgers will not be trimmed. The most recently closed ledger we be persisted past any retention. Perhaps in a subsequent PR, we implement the trimming to be trigger on a schedule as well.
### Motivation When using Readers API, backlog quotas are still enforced on these ephemeral readers. ### Modifications If a cursor is non-durable then we don't add it to the min-heap we are using keep track of the slowest cursor. We use in the min-heap to access the slowest cursor for backlog and quota calculations. It is also used to determine up to which ledger we can trim. Thus, with this change, reader's ephemeral subscriptions won't be preventing ledgers from being clean up as well which is what you expect the behavior to be. There are some caveats to the above. Since triggering of trimming of ledgers is piggy packed on top of ledger operations, if there is no new data coming in ledgers will not be trimmed. The most recently closed ledger we be persisted past any retention. Perhaps in a subsequent PR, we implement the trimming to be trigger on a schedule as well.
The compactor update the compaction cursor(mark delete) first and then update the `compactionHorizon` of the compacted topic. During the compaction cursor move forward, the original ledger will be removed if no other durable cursors. At the same time, if the reader is reading data from the original ledger, the reader will skip the data while the original ledger been removed, details to see apache#6787. So the reader might skip the compacted data since the `compactionHorizon` have not updated yet. The approach is: 1. Update the `compactionHorizon` before the compaction cursor move forward, so that the reader will not skip the original data before `compactionHorizon` updated. If the broker crashes before the new compacted Ledger ID been persistent, after the topic been loaded, the compaction can be trigger again and will not loss any data, but we will have an orphan ledger cannot be delete in the BookKeeper cluster. 2. Remove the previous compacted Ledger after the compaction cursor move forward, make sure the new compacted Ledger ID been persistent, Otherwise, we might lost compacted ledger if broker crashes.
#12522) * Fix the reader skips compacted data which original ledger been removed The compactor update the compaction cursor(mark delete) first and then update the `compactionHorizon` of the compacted topic. During the compaction cursor move forward, the original ledger will be removed if no other durable cursors. At the same time, if the reader is reading data from the original ledger, the reader will skip the data while the original ledger been removed, details to see #6787. So the reader might skip the compacted data since the `compactionHorizon` have not updated yet. The approach is: 1. Update the `compactionHorizon` before the compaction cursor move forward, so that the reader will not skip the original data before `compactionHorizon` updated. If the broker crashes before the new compacted Ledger ID been persistent, after the topic been loaded, the compaction can be trigger again and will not loss any data, but we will have an orphan ledger cannot be delete in the BookKeeper cluster. 2. Remove the previous compacted Ledger after the compaction cursor move forward, make sure the new compacted Ledger ID been persistent, Otherwise, we might lost compacted ledger if broker crashes. * Fix checkstyle * Fix tests. * Fix test
…g ledgers while topic with compaction. For the non-durable cursor, the ledgers trimming task will cause skip the removed ledgers to avoid readers introduced backlogs and make sure the data can be removed if over the retention, more details to see apache#6787. But for a topic which enabled compaction, this will lead to the reader skips the compacted data. The new added test can illustrate this problem well. For reading compacted data, reading a message ID that earlier that the first message ID of the original data is a normal behavior, so we should not move forward the cursor which will read the compacted data.
#12522) * Fix the reader skips compacted data which original ledger been removed The compactor update the compaction cursor(mark delete) first and then update the `compactionHorizon` of the compacted topic. During the compaction cursor move forward, the original ledger will be removed if no other durable cursors. At the same time, if the reader is reading data from the original ledger, the reader will skip the data while the original ledger been removed, details to see #6787. So the reader might skip the compacted data since the `compactionHorizon` have not updated yet. The approach is: 1. Update the `compactionHorizon` before the compaction cursor move forward, so that the reader will not skip the original data before `compactionHorizon` updated. If the broker crashes before the new compacted Ledger ID been persistent, after the topic been loaded, the compaction can be trigger again and will not loss any data, but we will have an orphan ledger cannot be delete in the BookKeeper cluster. 2. Remove the previous compacted Ledger after the compaction cursor move forward, make sure the new compacted Ledger ID been persistent, Otherwise, we might lost compacted ledger if broker crashes. * Fix checkstyle * Fix tests. * Fix test (cherry picked from commit 74dd9b9)
…g ledgers while topic with compaction (#12602) * [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction. For the non-durable cursor, the ledgers trimming task will cause skip the removed ledgers to avoid readers introduced backlogs and make sure the data can be removed if over the retention, more details to see #6787. But for a topic which enabled compaction, this will lead to the reader skips the compacted data. The new added test can illustrate this problem well. For reading compacted data, reading a message ID that earlier that the first message ID of the original data is a normal behavior, so we should not move forward the cursor which will read the compacted data. * Fix checkstyle. * Fix tests. * Fix tests.
…g ledgers while topic with compaction (#12602) * [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction. For the non-durable cursor, the ledgers trimming task will cause skip the removed ledgers to avoid readers introduced backlogs and make sure the data can be removed if over the retention, more details to see #6787. But for a topic which enabled compaction, this will lead to the reader skips the compacted data. The new added test can illustrate this problem well. For reading compacted data, reading a message ID that earlier that the first message ID of the original data is a normal behavior, so we should not move forward the cursor which will read the compacted data. * Fix checkstyle. * Fix tests. * Fix tests. (cherry picked from commit a6b1b34)
apache#12522) * Fix the reader skips compacted data which original ledger been removed The compactor update the compaction cursor(mark delete) first and then update the `compactionHorizon` of the compacted topic. During the compaction cursor move forward, the original ledger will be removed if no other durable cursors. At the same time, if the reader is reading data from the original ledger, the reader will skip the data while the original ledger been removed, details to see apache#6787. So the reader might skip the compacted data since the `compactionHorizon` have not updated yet. The approach is: 1. Update the `compactionHorizon` before the compaction cursor move forward, so that the reader will not skip the original data before `compactionHorizon` updated. If the broker crashes before the new compacted Ledger ID been persistent, after the topic been loaded, the compaction can be trigger again and will not loss any data, but we will have an orphan ledger cannot be delete in the BookKeeper cluster. 2. Remove the previous compacted Ledger after the compaction cursor move forward, make sure the new compacted Ledger ID been persistent, Otherwise, we might lost compacted ledger if broker crashes. * Fix checkstyle * Fix tests. * Fix test
…g ledgers while topic with compaction (apache#12602) * [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction. For the non-durable cursor, the ledgers trimming task will cause skip the removed ledgers to avoid readers introduced backlogs and make sure the data can be removed if over the retention, more details to see apache#6787. But for a topic which enabled compaction, this will lead to the reader skips the compacted data. The new added test can illustrate this problem well. For reading compacted data, reading a message ID that earlier that the first message ID of the original data is a normal behavior, so we should not move forward the cursor which will read the compacted data. * Fix checkstyle. * Fix tests. * Fix tests.
#12522) * Fix the reader skips compacted data which original ledger been removed The compactor update the compaction cursor(mark delete) first and then update the `compactionHorizon` of the compacted topic. During the compaction cursor move forward, the original ledger will be removed if no other durable cursors. At the same time, if the reader is reading data from the original ledger, the reader will skip the data while the original ledger been removed, details to see #6787. So the reader might skip the compacted data since the `compactionHorizon` have not updated yet. The approach is: 1. Update the `compactionHorizon` before the compaction cursor move forward, so that the reader will not skip the original data before `compactionHorizon` updated. If the broker crashes before the new compacted Ledger ID been persistent, after the topic been loaded, the compaction can be trigger again and will not loss any data, but we will have an orphan ledger cannot be delete in the BookKeeper cluster. 2. Remove the previous compacted Ledger after the compaction cursor move forward, make sure the new compacted Ledger ID been persistent, Otherwise, we might lost compacted ledger if broker crashes. * Fix checkstyle * Fix tests. * Fix test (cherry picked from commit 74dd9b9)
…g ledgers while topic with compaction (#12602) * [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction. For the non-durable cursor, the ledgers trimming task will cause skip the removed ledgers to avoid readers introduced backlogs and make sure the data can be removed if over the retention, more details to see #6787. But for a topic which enabled compaction, this will lead to the reader skips the compacted data. The new added test can illustrate this problem well. For reading compacted data, reading a message ID that earlier that the first message ID of the original data is a normal behavior, so we should not move forward the cursor which will read the compacted data. * Fix checkstyle. * Fix tests. * Fix tests. (cherry picked from commit a6b1b34)
apache#12522) * Fix the reader skips compacted data which original ledger been removed The compactor update the compaction cursor(mark delete) first and then update the `compactionHorizon` of the compacted topic. During the compaction cursor move forward, the original ledger will be removed if no other durable cursors. At the same time, if the reader is reading data from the original ledger, the reader will skip the data while the original ledger been removed, details to see apache#6787. So the reader might skip the compacted data since the `compactionHorizon` have not updated yet. The approach is: 1. Update the `compactionHorizon` before the compaction cursor move forward, so that the reader will not skip the original data before `compactionHorizon` updated. If the broker crashes before the new compacted Ledger ID been persistent, after the topic been loaded, the compaction can be trigger again and will not loss any data, but we will have an orphan ledger cannot be delete in the BookKeeper cluster. 2. Remove the previous compacted Ledger after the compaction cursor move forward, make sure the new compacted Ledger ID been persistent, Otherwise, we might lost compacted ledger if broker crashes. * Fix checkstyle * Fix tests. * Fix test (cherry picked from commit 74dd9b9) (cherry picked from commit df68493)
Motivation
When using Readers API, backlog quotas are still enforced on these ephemeral readers.
Modifications
If a cursor is non-durable then we don't add it to the min-heap we are using keep track of the slowest cursor.
We use in the min-heap to access the slowest cursor for backlog and quota calculations.
It is also used to determine up to which ledger we can trim. Thus, with this change, reader's ephemeral subscriptions won't be preventing ledgers from being clean up as well which is what you expect the behavior to be.
There are some caveats to the above. Since triggering of trimming of ledgers is piggy packed on top of ledger operations, if there is no new data coming in ledgers will not be trimmed. The most recently closed ledger we be persisted past any retention. Perhaps in a subsequent PR, we implement the trimming to be trigger on a schedule as well.
Please note this PR includes changes from:
#6769
Verifying this change
Added a test to verify that readers don't cause any backlog issues