-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20652][sql] Store SQL UI data in the new app status store. #19681
Conversation
This change replaces the SQLListener with a new implementation that saves the data to the same store used by the SparkContext's status store. For that, the types used by the old SQLListener had to be updated a bit so that they're more serialization-friendly. The interface for getting data from the store was abstracted into a new class, SQLAppStatusStore (following the convention used in core). Another change is the way that the SQL UI hooks up into the core UI or the SHS. The old "SparkHistoryListenerFactory" was replaced with a new "AppStatePlugin" that more explicitly differentiates between the two use cases: processing events, and showing the UI. Both live apps and the SHS use this new API (previously, it was restricted to the SHS). Note on the above: this causes a slight change of behavior for live apps; the SQL tab will only show up after the first execution is started. The metrics gathering code was re-worked a bit so that the types used are less memory hungry and more serialization-friendly. This reduces memory usage when using in-memory stores, and reduces load times when using disk stores. Tested with existing and added unit tests. Note one unit test was disabled because it depends on SPARK-20653, which isn't in yet.
For context:
Note I took this PR out of its original order in my repo to speed up reviews. That means it is not exactly the same code as in there - I'll need to clean up the |
Test build #83521 has finished for PR 19681 at commit
|
Test build #83525 has finished for PR 19681 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks reasonable but I need to spend more time understanding the old SQL UI first ...
ui: Option[SparkUI] = None) | ||
extends SparkListener with Logging { | ||
|
||
// How often to flush intermediate statge of a live execution to the store. When replaying logs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: statge
import org.apache.spark.ui.SparkUI | ||
import org.apache.spark.util.kvstore.KVStore | ||
|
||
private[sql] class SQLAppStatusListener( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see calls to update
in onTaskEnd
or onExecutorMetricsUpdate
. Does that mean the live UI wont' update till a stage is finished? But after looking at the tests, I guess I'm wrong, it does update ... where is the update I'm missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The live UI is updated from the info in LiveStageMetrics
, which is not written to the store. That's kept in memory while executions are running, and aggregated into the final metrics view when the execution finishes (see aggregateMetrics
).
SQLAppStatusStore.executionMetrics
has logic to call the listener directly when the final metrics are not yet computed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQLAppStatusStore.executionMetrics has logic to call the listener directly when the final metrics are not yet computed.
ahh, that is the part I hadn't noticed. thanks
import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} | ||
|
||
import org.apache.spark.util.kvstore.InMemoryStore | ||
|
||
class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename test to SQLAppStatusListenerSuite
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this suite has a mix of tests for code in the (current) listener source file and for stuff that's not related to those, which would belong in SQLAppStatusListenerSuite
. My original changes broke this into two different suites, but I chose to postpone that to reduce the size of the diff for now (and also to make the diff a little easier to read).
Test build #83563 has finished for PR 19681 at commit
|
retest this please |
Test build #83567 has started for PR 19681 at commit |
val now = System.nanoTime() | ||
if (exec.endEvents >= exec.jobs.size + 1) { | ||
liveExecutions.remove(exec.executionId) | ||
exec.write(kvstore, now) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if a job was killed mid execution, so the event log didn't contain the end events, you wouldn't get to see any info for that execution, would you? don't you need some final flush?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you kill a job, isn't an end event generated?
This is trying to mimic the behavior from the old listener:
jobEnd.jobResult match {
case JobSucceeded => executionUIData.jobs(jobId) = JobExecutionStatus.SUCCEEDED
case JobFailed(_) => executionUIData.jobs(jobId) = JobExecutionStatus.FAILED
}
if (executionUIData.completionTime.nonEmpty && !executionUIData.hasRunningJobs) {
// We are the last job of this execution, so mark the execution as finished. Note that
// `onExecutionEnd` also does this, but currently that can be called before `onJobEnd`
// since these are called on different threads.
markExecutionFinished(executionId)
}
So in that case, if there's no end event for the job, the execution will not be marked as finished either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, I meant if the application is killed, so the event log just ends abruptly. I think the old history server code would still show you the updated metrics for all the tasks that had completed. But seems like after this change, the history server won't show anything for any jobs which hadn't completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true; I've actually fixed that in the M6 code:
https://github.com/vanzin/spark/pull/51/files#diff-a74d84702d8d47d5269e96740a55a3caR56
It's not very easy to fix here without writing throw-away code to propagate some "close()" call to this listener.
|
||
private def isSQLStage(stageId: Int): Boolean = { | ||
liveExecutions.values.exists { exec => | ||
exec.stages.contains(stageId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason not to make exec.stages
a Set
? I guess it would normally be small so it doesn't matter, but seems like it couldn't hurt to protect against some strange SQL query with some huge number of stages.
Test build #83570 has started for PR 19681 at commit |
@@ -40,7 +40,7 @@ private[sql] class SQLAppStatusListener( | |||
ui: Option[SparkUI] = None) | |||
extends SparkListener with Logging { | |||
|
|||
// How often to flush intermediate statge of a live execution to the store. When replaying logs, | |||
// How often to flush intermediate stage of a live execution to the store. When replaying logs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err, was this supposed to be "state"?
Test build #83585 has finished for PR 19681 at commit
|
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I have a reasonable handle on the SQL UI stuff now.
Are there any tests for the SQL UI in the history server? I don't see anything ...
val stageId: Int, | ||
var attemptId: Int, | ||
val accumulatorIds: Array[Long], | ||
val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you include a comment explaining the threading concerns in this class? at first I didn't think the CHM or the synchronized
were necessary since the listener methods are just called from one thread, but then I realized the UI will also call executionMetrics(executionId)
(I think that is the only reason?). Also I think there are more synchronized
than necessary -- both sites aggregateMetrics
are called have already acquired the lock, so that shouldn't need it again. though it doesn't hurt, it can be confusing if its not clear where there lock is supposed to be aquired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the implementation a bit since it wasn't completely correct. Added some comments in a few places.
import org.apache.spark.scheduler._ | ||
import org.apache.spark.sql.execution.SQLExecution | ||
import org.apache.spark.sql.execution.metric._ | ||
import org.apache.spark.sql.internal.StaticSQLConf._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused import
import com.fasterxml.jackson.databind.annotation.JsonDeserialize | ||
|
||
import org.apache.spark.{JobExecutionStatus, SparkConf} | ||
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkListenerEvent is unused
logWarning(s"A task should not have a higher stageAttemptID ($stageAttemptID) then " + | ||
s"what we have seen (${stageMetrics.stageAttemptId})") | ||
} else { | ||
// TODO We don't know the attemptId. Currently, what we can do is overriding the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment was hard to make sense of (you shouldn't ever have two tasks with the same taskId, even with speculation), but I think there is something here which may still be worth mentioning. You aggregate metrics across all attempts for a given task (aka "index"), even speculative ones (before and after your change) -- I'd mention that in a comment.
(The index is available in onTaskStart / End if we wanted to de-duplicate.)
@@ -420,21 +428,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared | |||
withTempPath { file => | |||
// person creates a temporary view. get the DF before listing previous execution IDs | |||
val data = person.select('name) | |||
sparkContext.listenerBus.waitUntilEmpty(10000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't you still need this waitUntilEmpty
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
person.select()
doesn't generate any events.
No, because those tests are in |
Test build #83603 has finished for PR 19681 at commit
|
Test build #83609 has finished for PR 19681 at commit
|
Test build #83620 has finished for PR 19681 at commit
|
OK -- is there a jira for adding those tests? |
Not that I remember. |
val exec = store.read(classOf[SQLExecutionUIData], executionId) | ||
Option(exec.metricValues) | ||
.orElse(listener.map(_.executionMetrics(executionId))) | ||
.getOrElse(Map()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a race here when the execution ends?
- T1 (UI thread, calling this method): execution hasn't ended, so
exec.metricValues
is null - T2 (listener): execution ends, drops execution from
liveExecutions
- T1:
_.executionMetrics(executionId)
throws an exception because its dropped from theliveExecutions
} | ||
} | ||
|
||
// TODO: storing metrics by task ID can lead to innacurate metrics when speculation is on. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its more general than this. I'd say
Since we store metrics by taskID, in a way we'll double-count the stage metrics when there are multiple tasks for a given index -- in particular, if there is speculation, or if there are multiple attempts for a task.
Test build #83658 has finished for PR 19681 at commit
|
lgtm need to fix the merge conflict though |
…ryError ## What changes were proposed in this pull request? This PR addresses the issue [SPARK-22471](https://issues.apache.org/jira/browse/SPARK-22471). The modified version of `SQLListener` respects the setting `spark.ui.retainedStages` and keeps the number of the tracked stages within the specified limit. The hash map `_stageIdToStageMetrics` does not outgrow the limit, hence overall memory consumption does not grow with time anymore. A 2.2-compatible fix. Maybe incompatible with 2.3 due to #19681. ## How was this patch tested? A new unit test covers this fix - see `SQLListenerMemorySuite.scala`. Author: Arseniy Tashoyan <tashoyan@gmail.com> Closes #19711 from tashoyan/SPARK-22471-branch-2.2.
Test build #83802 has finished for PR 19681 at commit
|
@vanzin merge conflict since I merged the Job & stage page change |
Test build #83855 has finished for PR 19681 at commit
|
merged to master |
} | ||
} | ||
|
||
override def setupUI(ui: SparkUI): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a clear rule about when setupListeners
is called and when setupUI
is called?
Here we register SQLAppStatusListener
in both setupListeners
and setupUI
, will we register it twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The calls are made in specific cases (setupListeners when setting up a listener bus, setupUI when setting up the UI, always). But this implementation has to be a little weird because we don't want the SQL UI if SQL hasn't been initialized, if we're to maintain the old behavior.
I don't think the listener is installed twice here - here it's only installed for non-live applications (= SHS) and below it's only installed if there's a SparkContext (= live application).
If we're ok to modify the existing behavior and always have the SQL tab, this can be simplified a lot.
_ui.foreach(_.bind()) | ||
_ui.foreach { ui => | ||
// Load any plugins that might want to modify the UI. | ||
AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin the live UI doesn't need a 2-step process to set up the UI, while history server needs. That's why I think they should not share one plugin interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's continue the discussion on the other PR.
def waitTillExecutionFinished(): Unit = { | ||
while (listener.getCompletedExecutions.isEmpty) { | ||
Thread.sleep(100) | ||
while (statusStore.executionsList().size < oldCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't mean execution ends now, but execution starts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, but the lines below:
// Wait for listener to finish computing the metrics for the execution.
while (statusStore.executionsList().last.metricValues == null) {
Thread.sleep(100)
}
Basically wait for that new execution to end (which is when the coalesced metricValues
field is populated).
val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue) | ||
sqlContext.sparkContext.addSparkListener(listener) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it really work? The listener is attached to a ReplayBus
, not the real spark event bus.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it works. There are two kinds of tests in this suite now:
- "test(blah)" like this one which uses the active spark session's listener
- "sqlStoreTest" which manually drive a replay bus and verify expected changes in the store.
This particular test (and the other one you commented on) are the first kind. Tests of the other kind do not run actual jobs, they just inject events into the replay bus manually.
} | ||
|
||
test("SPARK-11126: no memory leak when running non SQL jobs") { | ||
val previousStageNumber = spark.sharedState.listener.stageIdToStageMetrics.size | ||
val previousStageNumber = statusStore.executionsList().size | ||
spark.sparkContext.parallelize(1 to 10).foreach(i => ()) | ||
spark.sparkContext.listenerBus.waitUntilEmpty(10000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, the listener is not attached to spark event bus.
|
||
class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { | ||
import testImplicits._ | ||
import org.apache.spark.AccumulatorSuite.makeInfo | ||
|
||
override protected def sparkConf = super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the spark context is shared for all test suites, we should only set this conf to 0 in this suite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that true? The suite extends SharedSQLContext
(which extends SharedSparkSession
) and SQLTestUtils
, all of which are traits, not objects. (Unlike TestHive
which does force sessions to be used across suites for hive tests.)
There are also other suites that modify the conf (such as HDFSMetadataLogSuite
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah you are right, it's only shared in hive tests
} | ||
|
||
exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) | ||
exec.stages = event.stageIds.toSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin , shall we add the stageIds to the existing stageIds? Otherwise we will lose the stageIds in previous jobs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh good catch. I can submit a fix for this
## What changes were proposed in this pull request? In #19681 we introduced a new interface called `AppStatusPlugin`, to register listeners and set up the UI for both live and history UI. However I think it's an overkill for live UI. For example, we should not register `SQLListener` if users are not using SQL functions. Previously we register the `SQLListener` and set up SQL tab when `SparkSession` is firstly created, which indicates users are going to use SQL functions. But in #19681 , we register the SQL functions during `SparkContext` creation. The same thing should apply to streaming too. I think we should keep the previous behavior, and only use this new interface for history server. To reflect this change, I also rename the new interface to `SparkHistoryUIPlugin` This PR also refines the tests for sql listener. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #19981 from cloud-fan/listener.
…ryError ## What changes were proposed in this pull request? This PR addresses the issue [SPARK-22471](https://issues.apache.org/jira/browse/SPARK-22471). The modified version of `SQLListener` respects the setting `spark.ui.retainedStages` and keeps the number of the tracked stages within the specified limit. The hash map `_stageIdToStageMetrics` does not outgrow the limit, hence overall memory consumption does not grow with time anymore. A 2.2-compatible fix. Maybe incompatible with 2.3 due to apache#19681. ## How was this patch tested? A new unit test covers this fix - see `SQLListenerMemorySuite.scala`. Author: Arseniy Tashoyan <tashoyan@gmail.com> Closes apache#19711 from tashoyan/SPARK-22471-branch-2.2.
This change replaces the SQLListener with a new implementation that
saves the data to the same store used by the SparkContext's status
store. For that, the types used by the old SQLListener had to be
updated a bit so that they're more serialization-friendly.
The interface for getting data from the store was abstracted into
a new class, SQLAppStatusStore (following the convention used in
core).
Another change is the way that the SQL UI hooks up into the core
UI or the SHS. The old "SparkHistoryListenerFactory" was replaced
with a new "AppStatePlugin" that more explicitly differentiates
between the two use cases: processing events, and showing the UI.
Both live apps and the SHS use this new API (previously, it was
restricted to the SHS).
Note on the above: this causes a slight change of behavior for
live apps; the SQL tab will only show up after the first execution
is started.
The metrics gathering code was re-worked a bit so that the types
used are less memory hungry and more serialization-friendly. This
reduces memory usage when using in-memory stores, and reduces load
times when using disk stores.
Tested with existing and added unit tests. Note one unit test was
disabled because it depends on SPARK-20653, which isn't in yet.