-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-20223] The RecreateOnResetOperatorCoordinator and SourceCoordi… #14143
Conversation
…nator executor thread should use the user class loader.
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 2dd03fd (Thu Nov 19 18:07:05 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The basic fix is good, but I would extend this with passing the User Code Classloader directly. It is fragile to assume that the thread calling the creation (a scheduler thread) has the usercode class loader always.
I can take this over and make the remaining changes.
private Thread t; | ||
|
||
CoordinatorExecutorThreadFactory(String coordinatorThreadName) { | ||
this.coordinatorThreadName = coordinatorThreadName; | ||
this.t = null; | ||
this.cl = Thread.currentThread().getContextClassLoader(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be safer to pass the user code classloader here directly, rather than relying on the context class loader. That seems fragile.
@@ -55,6 +57,7 @@ private RecreateOnResetOperatorCoordinator( | |||
long closingTimeoutMs) throws Exception { | |||
this.context = context; | |||
this.provider = provider; | |||
this.userClassLoader = Thread.currentThread().getContextClassLoader(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
@StephanEwen Thanks for the suggestion. Passing in the user class loader is indeed more explicit. |
Merged as part of this change 40bbc17 |
…nator executor thread should use the user class loader.
What is the purpose of the change
This patch fixes two class loader related problems:
RecreateOnResetOperatorCoordinator
does not use the user class loader when creating the internal coordinator or calling start.ClassNotFoundException
could be thrown due to this. The patch fixes this issue by creating and starting the internal operator coordinator using the user class loader.SplitEnumerator
, it should run with user class loader as well.Brief change log
RecreateOnResetOperatorCoordinator
to create and start internal coordinators in user class loader.Verifying this change
Unit tests have been added / modified to test the change.
RecreateOnResetOperatorCoordinatorTest
.SourceCoordinatorProviderTest.testUserClassLoaderInCoordinatorExecutor()
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation