-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-10649][STREAMING] Prevent inheriting job group and irrelevant job description in streaming jobs #8781
Conversation
@andrewor14 Could you take a look at the core changes in this patch? |
ThreadUtils.runInNewThread("streaming-start") { | ||
sparkContext.setCallSite(startSite.get) | ||
sparkContext.setJobGroup( | ||
StreamingContext.STREAMING_JOB_GROUP_ID, StreamingContext.STREAMING_JOB_DESCRIPTION, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am forced to set a specific job group and desc, because there is not way to remove them (because of default Property in the local property).
Test build #42546 has finished for PR 8781 at commit
|
Test build #42549 has finished for PR 8781 at commit
|
Test build #42552 has finished for PR 8781 at commit
|
Test build #42550 has finished for PR 8781 at commit
|
Test build #1767 has finished for PR 8781 at commit
|
Test build #1768 has finished for PR 8781 at commit
|
Test build #42565 has finished for PR 8781 at commit
|
runInNewThread("thread-name") { throw new IllegalArgumentException("test") } | ||
} | ||
assert(exception.isInstanceOf[IllegalArgumentException]) | ||
assert(exception.asInstanceOf[IllegalArgumentException].getMessage.contains("test")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can update these 5 lines to:
val exception = intercept[IllegalArgumentException] {
runInNewThread("thread-name") { throw new IllegalArgumentException("test") }
}
assert(exception.getMessage.contains("test"))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true!
LGTM except some nits |
Test build #42600 has finished for PR 8781 at commit
|
@andrewor14 Will you be able to take a look? This is blocking other PRs |
ThreadUtils.runInNewThread("streaming-start") { | ||
sparkContext.setCallSite(startSite.get) | ||
sparkContext.clearJobGroup() | ||
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to set SPARK_JOB_INTERRUPT_ON_CANCEL
to false, considering clearJobGroup
will clean it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically I dont want to rely on the default value of this parameter, and explicitly specify that do not interrupt.
Test build #42658 has finished for PR 8781 at commit
|
@@ -610,7 +614,7 @@ class StreamingContext private[streaming] ( | |||
assert(env.metricsSystem != null) | |||
env.metricsSystem.registerSource(streamingSource) | |||
uiTab.foreach(_.attach()) | |||
logInfo("StreamingContext started") | |||
this.logInfo("StreamingContext started") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad. Forgot to remove that.
StreamingContext.ACTIVATION_LOCK.synchronized { | ||
StreamingContext.assertNoOtherContextIsActive() | ||
try { | ||
validate() | ||
scheduler.start() | ||
ThreadUtils.runInNewThread("streaming-start") { | ||
sparkContext.setCallSite(startSite.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason why we need to move this in here? It's an atomic reference so it doesn't really matter which thread reads it right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this sets a thread local variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an inheritable thread local variable, so it still doesn't matter
(anyway we can just keep this change, not a big deal, mainly just wondering)
@tdas this looks good. Once you address the comments I can merge this. |
LGTM LGTM |
Test build #42764 has finished for PR 8781 at commit
|
Test build #42768 has finished for PR 8781 at commit
|
I am merging this patch to master. I will issue a separate PR for branch 1.5. Thanks @andrewor14 and @zsxwing for reviewing. |
… job description in streaming jobs The job group, and job descriptions information is passed through thread local properties, and get inherited by child threads. In case of spark streaming, the streaming jobs inherit these properties from the thread that called streamingContext.start(). This may not make sense. 1. Job group: This is mainly used for cancelling a group of jobs together. It does not make sense to cancel streaming jobs like this, as the effect will be unpredictable. And its not a valid usecase any way, to cancel a streaming context, call streamingContext.stop() 2. Job description: This is used to pass on nice text descriptions for jobs to show up in the UI. The job description of the thread that calls streamingContext.start() is not useful for all the streaming jobs, as it does not make sense for all of the streaming jobs to have the same description, and the description may or may not be related to streaming. The solution in this PR is meant for the Spark master branch, where local properties are inherited by cloning the properties. The job group and job description in the thread that starts the streaming scheduler are explicitly removed, so that all the subsequent child threads does not inherit them. Also, the starting is done in a new child thread, so that setting the job group and description for streaming, does not change those properties in the thread that called streamingContext.start(). Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#8781 from tdas/SPARK-10649.
Test build #42769 timed out for PR 8781 at commit |
… job description in streaming jobs **Note that this PR only for branch 1.5. See #8781 for the solution for Spark master.** The job group, and job descriptions information is passed through thread local properties, and get inherited by child threads. In case of spark streaming, the streaming jobs inherit these properties from the thread that called streamingContext.start(). This may not make sense. 1. Job group: This is mainly used for cancelling a group of jobs together. It does not make sense to cancel streaming jobs like this, as the effect will be unpredictable. And its not a valid usecase any way, to cancel a streaming context, call streamingContext.stop() 2. Job description: This is used to pass on nice text descriptions for jobs to show up in the UI. The job description of the thread that calls streamingContext.start() is not useful for all the streaming jobs, as it does not make sense for all of the streaming jobs to have the same description, and the description may or may not be related to streaming. The solution in this PR is meant for the Spark branch 1.5, where local properties are inherited by cloning the properties only when the Spark config `spark.localProperties.clone` is set to `true` (see #8781 for the PR for Spark master branch). Similar to the approach taken by #8721, StreamingContext sets that configuration to true, which makes sure that all subsequent child threads get a cloned copy of the threadlocal properties. This allows the job group and job description to be explicitly removed in the thread that starts the streaming scheduler, so that all the subsequent child threads does not inherit them. Also, the starting is done in a new child thread, so that setting the job group and description for streaming, does not change those properties in the thread that called streamingContext.start(). Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8856 from tdas/SPARK-10649-1.5.
The job group, and job descriptions information is passed through thread local properties, and get inherited by child threads. In case of spark streaming, the streaming jobs inherit these properties from the thread that called streamingContext.start(). This may not make sense.
The solution in this PR is meant for the Spark master branch, where local properties are inherited by cloning the properties. The job group and job description in the thread that starts the streaming scheduler are explicitly removed, so that all the subsequent child threads does not inherit them. Also, the starting is done in a new child thread, so that setting the job group and description for streaming, does not change those properties in the thread that called streamingContext.start().