-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Trigger rollover when meeting maxLedgerRolloverTimeMinutes #7111
Trigger rollover when meeting maxLedgerRolloverTimeMinutes #7111
Conversation
/** | ||
* Roll current ledger if it is full | ||
*/ | ||
void rollCurrentLedgerIfFull(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to expose this in the interface, it should be better to keep in implementation details
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope to be able to consult you here for advice, because I am not sure whether doing type casting in the BrokerService
is a suitable implementation. Maybe I can modify with:
((ManagedLedgerImpl) managedLedger).rollCurrentLedgerIfFull();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that this is a managedLedger internal task, for which the BrokerService
shouldn't be concerned. For that it would be better to handle in the ManagedLedgerFactoryImpl
, to go through all open managed ledger instances and check if a rollover has to be forced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I feel a little confused. If we treat rollover as an internal task, should consumedLedgersMonitor
or backlogQuotaChecker
be internal tasks as well? Or is my understanding biased?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wuzhanpeng I think you can add an issue to track the monitor that outside the managed ledger but should be maintained by managed ledger own.
} | ||
|
||
ledgerClosed(lh); | ||
createLedgerAfterClosed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need initiate the creation of a ledger at this point. We can stay in LedgerClosed state until a new write comes in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes in current code logic we can stay in closed
state until the next entry comes in, however if we choose for waiting, the last created topic ledger will be never removed beacause the trimming stratege will not remove the current ledger. In such scenario, we will maintain a lot of useless data if the topic is no longer being used. Moreover, it may cause disk problem if we keep a lot mount of discarded topics in the cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I think we can change the current logic. If the current ledger is closed, we can delete it. I'm not sure is there any problems with this change. @merlimat Could you please help check this?
@@ -277,6 +278,8 @@ public BrokerService(PulsarService pulsar) throws Exception { | |||
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-publish-buffer-monitor")); | |||
this.consumedLedgersMonitor = Executors | |||
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor")); | |||
this.ledgerFullMonitor = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This thread is not being stopped. Potentially we could also reuse an existing executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 @wuzhanpeng can we re-use an existing executor? Otherwise, we end up creating a lot of executors.
@@ -485,6 +489,12 @@ protected void startConsumedLedgersMonitor() { | |||
} | |||
} | |||
|
|||
protected void startLedgerFullMonitor() { | |||
int interval = pulsar().getConfiguration().getManagedLedgerMaxLedgerRolloverTimeMinutes(); | |||
ledgerFullMonitor.scheduleAtFixedRate(safeRun(this::checkLedgerFull), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the recurring task needs to be cancelled when BrokerService is closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I newly add a shutdown
for the monitor when BrokerService
is closed
move to 2.7.0 first. |
@merlimat Would you please help review it again? |
…en_triggering_maxLedgerRolloverTimeMinutes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks good to me, I just left a minor comment.
} | ||
|
||
ledgerClosed(lh); | ||
createLedgerAfterClosed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I think we can change the current logic. If the current ledger is closed, we can delete it. I'm not sure is there any problems with this change. @merlimat Could you please help check this?
@@ -277,6 +278,8 @@ public BrokerService(PulsarService pulsar) throws Exception { | |||
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-publish-buffer-monitor")); | |||
this.consumedLedgersMonitor = Executors | |||
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor")); | |||
this.ledgerFullMonitor = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 @wuzhanpeng can we re-use an existing executor? Otherwise, we end up creating a lot of executors.
This pull request resolves apache#7184 ### Motivation This pull request implements a monitor thread to check if the current topic ledger meets the constraint of `managedLedgerMaxLedgerRolloverTimeMinutes` and trigger a rollover to make the configuration take effect. Another important idea here is, if we trigger a rollover, we can close the current ledger so that we can release the storage of current ledger btw. Because for some less commonly used topics, the current ledger data is likely to have expired, and the current rollover logic will only be triggered when adding a new entry. Obviously, this will result in a waste of disk space. ### Expected behaviors The monitor thread will be scheduled at fix time interval and the interval is set to `managedLedgerMaxLedgerRolloverTimeMinutes`. Each inspection will make two judgments at the same time, i.e. `currentLedgerEntries > 0` and `currentLedgerIsFull()`. When the number of current entry is equal to 0, it will not trigger a new rollover and we use this to reduce the ledger's creation. ### Modifications - The main modification took place in `ManagedLedgerImpl` - In addition, a check thread was added in the `BrokerService` maybe related to apache#6935
This pull request resolves apache#7184 ### Motivation This pull request implements a monitor thread to check if the current topic ledger meets the constraint of `managedLedgerMaxLedgerRolloverTimeMinutes` and trigger a rollover to make the configuration take effect. Another important idea here is, if we trigger a rollover, we can close the current ledger so that we can release the storage of current ledger btw. Because for some less commonly used topics, the current ledger data is likely to have expired, and the current rollover logic will only be triggered when adding a new entry. Obviously, this will result in a waste of disk space. ### Expected behaviors The monitor thread will be scheduled at fix time interval and the interval is set to `managedLedgerMaxLedgerRolloverTimeMinutes`. Each inspection will make two judgments at the same time, i.e. `currentLedgerEntries > 0` and `currentLedgerIsFull()`. When the number of current entry is equal to 0, it will not trigger a new rollover and we use this to reduce the ledger's creation. ### Modifications - The main modification took place in `ManagedLedgerImpl` - In addition, a check thread was added in the `BrokerService` maybe related to apache#6935
…0087) This pull request resolves #10086 ## Motivation Although #7111 and #9136 solved the problems of 1. current ledger is full and cannot be rolled 2. cursor has subscribed to an expired ledger, which makes the cleaning thread unable to clean up respectively. However, when we close and open a managed-ledger(such like brokers shutdown unexpectedly or topics unload due to rebalance), the managed-ledger will create an empty ledger after initialization causing the current cleanup logic failed to take effect. Therefore I think we need a more unified entrance to solve the cursor update problem, and then further solve the problem of clearing expired ledgers. ## Expected Behavior 1. Able to cleanup expired data after managed-ledger re-open 2. A more unified entrance to implement cursor update ## Modification 1. move the cursor update logic to `internalTrimConsumedLedgers` from `ManagedLedgerImpl`'s callback `createComplete` 2. update `lastConfirmedEntry` if necessary when updating cursors
…ache#10087) This pull request resolves apache#10086 ## Motivation Although apache#7111 and apache#9136 solved the problems of 1. current ledger is full and cannot be rolled 2. cursor has subscribed to an expired ledger, which makes the cleaning thread unable to clean up respectively. However, when we close and open a managed-ledger(such like brokers shutdown unexpectedly or topics unload due to rebalance), the managed-ledger will create an empty ledger after initialization causing the current cleanup logic failed to take effect. Therefore I think we need a more unified entrance to solve the cursor update problem, and then further solve the problem of clearing expired ledgers. ## Expected Behavior 1. Able to cleanup expired data after managed-ledger re-open 2. A more unified entrance to implement cursor update ## Modification 1. move the cursor update logic to `internalTrimConsumedLedgers` from `ManagedLedgerImpl`'s callback `createComplete` 2. update `lastConfirmedEntry` if necessary when updating cursors
…ache#10087) This pull request resolves apache#10086 ## Motivation Although apache#7111 and apache#9136 solved the problems of 1. current ledger is full and cannot be rolled 2. cursor has subscribed to an expired ledger, which makes the cleaning thread unable to clean up respectively. However, when we close and open a managed-ledger(such like brokers shutdown unexpectedly or topics unload due to rebalance), the managed-ledger will create an empty ledger after initialization causing the current cleanup logic failed to take effect. Therefore I think we need a more unified entrance to solve the cursor update problem, and then further solve the problem of clearing expired ledgers. ## Expected Behavior 1. Able to cleanup expired data after managed-ledger re-open 2. A more unified entrance to implement cursor update ## Modification 1. move the cursor update logic to `internalTrimConsumedLedgers` from `ManagedLedgerImpl`'s callback `createComplete` 2. update `lastConfirmedEntry` if necessary when updating cursors
This pull request resolves #7184
Motivation
This pull request implements a monitor thread to check if the current topic ledger meets the constraint of
managedLedgerMaxLedgerRolloverTimeMinutes
and trigger a rollover to make the configuration take effect. Another important idea here is, if we trigger a rollover, we can close the current ledger so that we can release the storage of current ledger btw. Because for some less commonly used topics, the current ledger data is likely to have expired, and the current rollover logic will only be triggered when adding a new entry. Obviously, this will result in a waste of disk space.Expected behaviors
The monitor thread will be scheduled at fix time interval and the interval is set to
managedLedgerMaxLedgerRolloverTimeMinutes
. Each inspection will make two judgments at the same time, i.e.currentLedgerEntries > 0
andcurrentLedgerIsFull()
. When the number of current entry is equal to 0, it will not trigger a new rollover and we use this to reduce the ledger's creation.Modifications
ManagedLedgerImpl
BrokerService
maybe related to #6935