Skip to content

Add message periodic refresh#1000

Closed
mgao0 wants to merge 20 commits intoapache:masterfrom
mgao0:master
Closed

Add message periodic refresh#1000
mgao0 wants to merge 20 commits intoapache:masterfrom
mgao0:master

Conversation

@mgao0
Copy link
Contributor

@mgao0 mgao0 commented May 11, 2020

Issues

  • My PR addresses the following Helix issues and references them in the PR description:

#996

Description

  • Here are some details about my PR, including screenshots of any UI changes:

This PR adds a scheduled periodic task to read leftover messages from zk.
There are several situations that we would need this periodic refresh:

  1. A read from ZK fails and there is no more event to trigger another read
  2. Watcher is not registered properly so it fails to trigger the callback
  3. The message is not processed correctly and is discarded but no more incoming events to trigger another read

What would not be fixed by this PR is out of order problem, which is another issue may cause missing messages. For example, we may miss an event because its callback comes in after a finalize type event, so it will be ignored. Since this periodic refresh is done through generating an event, this out of order issue may affect refresh events too.

The refresh is done in callback handler and will be triggered if the time has passed for as long as a user defined interval since last processed event. The refresh is achieved by queuing an empty callback type event into the event processor.

Tests

  • The following tests are written for this issue:

TestPerioidicRefresh

  • The following is the result of the "mvn test" command on the appropriate module:
    Running

Commits

  • My commits all reference appropriate Apache Helix GitHub issues in their subject lines. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation (Optional)

  • In case of new functionality, my PR adds documentation in the following wiki page:

(Link the GitHub wiki you added)

Code Quality

  • My diff has been formatted using helix-style.xml
    (helix-style-intellij.xml if IntelliJ IDE is used)

@mgao0 mgao0 changed the title [WIP] Add message periodic refresh Add message periodic refresh May 12, 2020
@mgao0 mgao0 marked this pull request as ready for review May 12, 2020 21:59
Copy link
Contributor

@jiajunwang jiajunwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you have 2 similar PRs. And I think we are all reviewing the other one. Could you please specify which one is the right PR to review?
And you might need to refer to the comments there for this PR's change, unfortunately.

@mgao0
Copy link
Contributor Author

mgao0 commented May 13, 2020

@jiajunwang 14 hours ago Contributor
Why do you need to call it here synchronously? I think it's simpler if we just put this event into the event queue.
Given this saying, this logic will be fitting better if it is in the HelixManager level. What I'm trying to propose if the HelixManger is initialized as a Participant, then we start this timer thread there. It just injects message event with a certain interval. Of course, if a real message event comes earlier, then we reset the timer.

Put this logic in the executor does not make sense to me.

@mgao0
Copy link
Contributor Author

mgao0 commented May 13, 2020

@jiajunwang Sorry if it's confusing. This PR is for the periodic refresh. I have copied and pasted all your comments here and I will address them here.

@mgao0 mgao0 force-pushed the master branch 2 times, most recently from d4c167f to 00cb800 Compare May 26, 2020 23:12
@kaisun2000
Copy link
Contributor

I just saw this diff. In fact, this is a totally different from what it was before. I have quite some reservation to add periodical refresh in CallbackHandler.

CallbackHandler is the interface with zkclient managing the lifecycle of a specific Helix property path. Specifically they handle notification (further calling back to application logic) and registering further notification logic.

The part actually already had some notable issues (lose notification, or installing "leaked path etc) that should be addressed. We should not complicate this part with further here.

@mgao0
Copy link
Contributor Author

mgao0 commented May 28, 2020

I just saw this diff. In fact, this is a totally different from what it was before. I have quite some reservation to add periodical refresh in CallbackHandler.

CallbackHandler is the interface with zkclient managing the lifecycle of a specific Helix property path. Specifically they handle notification (further calling back to application logic) and registering further notification logic.

The part actually already had some notable issues (lose notification, or installing "leaked path etc) that should be addressed. We should not complicate this part with further here.

Doing periodic refresh in callback handler can actually resolve the issue that you mentioned such as lost notification or missing watcher. This periodic refresh logic is very independent, it's a separate thread, and it put an event in the event queue only if there is no event processed for a while. That being said, it the callback handler is functioning well and events are processed normally, it won't add much burden to the performance since the periodic refresh event won't be generated.
Is there any specific concern you have on the periodic refresh?

// so we won't have a memory leakage when we cancel scheduled task
_periodicRefreshExecutor.setRemoveOnCancelPolicy(true);
_scheduledRefreshFuture = _periodicRefreshExecutor
.scheduleWithFixedDelay(new RefreshTask(), _periodicRefreshInterval,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we talked about at least 2 of the following points:

  1. If the callback is triggered, the timer should be reset and the wait until the interval passes.
  2. If the queue is not empty, then we don't need to trigger the refresh.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They need to be done to avoid unnecessary performance impact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiajunwang The previous PR contains the first condition you mentioned. I added the second condition in the updated PR. And both checks are still being done in the callback handler instead of ZK event queue.
As per our discussion on Friday, I tried to do this in the ZKEvent queue, and found it hard to do, when an event coming in at the ZKEvent level, we don't know the change type of the event, thus not able to reset the timer accordingly. Currently at the ZKEvent level, no Helix logics are involved so I think it's better we keep it that way.
For your concern on parallel invoke, we synchronize on Helix manager in the invoke method, so it should be fine.
Let's discuss more if you still have concern about this.

/**
* This class tests that if there are no incoming events, the onMessage method in message listener will be called by message periodic refresh
*/
public class TestPeriodicRefresh extends ZkUnitTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaisun2000 since you raise this concern, please review this test case and verify if this is enough or not : )

@jiajunwang
Copy link
Contributor

Thanks for the update. Could you please add more information in the description about what the issue this refresh trying to address? I mean we discussed several scenarios that cannot be addressed by the other fixes, then we decided to proceed to finish this PR. But let's make it clear what is the missing part.

Copy link
Contributor

@jiajunwang jiajunwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Refresh" to "Trigger" for all the parameters for consistency?
Or we can call it "periodicCallbackInterval"? Then scheduleCallbackFuture, etc.

wait(remainingTime);
}
}
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the interruption exception, we may want to exit the execution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made changes to separate the interrupted exception with other exceptions. I will interrupt the thread when interrupted exception is caught, is this what you wanted?

_batchCallbackProcessor.resetEventQueue();
if (_scheduledRefreshFuture != null) {
_periodicRefreshExecutor.shutdownNow();
initRefreshTask();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next event would be FINALIZE, so even the periodic task is triggered, it won't really invoke the handling logic successfully. I think we shall not to init here.

Shall we do it in the init() call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the initTriggerTask() method to include reset as well. Please take a look. Thanks!

Copy link
Contributor

@jiajunwang jiajunwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good in general. Some minor comments.
Meanwhile, I think we are still discussing the default configuration. Let me approve after that decision has been made.

}
// When cancelling the task future, it removes the task from the queue
// so we won't have a memory leakage when we cancel scheduled task
_periodicTriggerExecutor.setRemoveOnCancelPolicy(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we only set it once after the fresh construct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Updated.

subscribeForChangesAsyn(changeContext.getType(), _path, _watchChild);
}

if (_periodicTriggerExecutor != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this in the initTriggerTask() method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to prevent the update of last invoke time for the cases where periodic trigger is not enabled. I don't think I can move this to init?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put it to the very beginning of this method?

* Used to initialize a periodic triggered task
* Schedule tasks in a task executor with fixed intervals
*/
private void initTriggerTask() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, take _periodicTriggerInterval as the parameter, and don't refer to the global private field. This is a good style in which you can reduce the impact of a private field in the class. So less dependency if you want to make any further change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion! Fixed. Thanks!

_periodicTriggerExecutor.setRemoveOnCancelPolicy(true);
}
_scheduledTriggerFuture = _periodicTriggerExecutor
.scheduleWithFixedDelay(new TriggerTask(), _periodicTriggerInterval,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I overlooked this part before. Why do you need to use scheduleWithFixedDelay() here? The TriggerTask object will keep running in the thread, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. A single long running thread is sufficient. Updated.

try {
while (true) {
long currentTime = System.currentTimeMillis();
long remainingTime = _lastInvokeTime + _periodicTriggerInterval - currentTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reduce the usage of _periodicTriggerInterval in the whole callback class, could you please pass the interval as a constructor parameter? That would be a cleaner design.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

subscribeForChangesAsyn(changeContext.getType(), _path, _watchChild);
}

if (_periodicTriggerExecutor != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put it to the very beginning of this method?

@mgao0 mgao0 force-pushed the master branch 2 times, most recently from 5d36f68 to bb9bd30 Compare June 25, 2020 23:28
@jiajunwang
Copy link
Contributor

Close due to inactive.

@jiajunwang jiajunwang closed this May 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants