-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25680][SQL] SQL execution listener shouldn't happen on execution thread #22674
Conversation
extends SparkListenerEvent | ||
extends SparkListenerEvent { | ||
|
||
@JsonIgnore private[sql] var executionName: Option[String] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For backward compatibility, I make these new fields var
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we want to be backwards compatible here? SHS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a developer api, which is public. The backward compatibility is not that strong, compared to end-user public APIs, but we should still keep them unchanged if not too hard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that said, a developer can write a spark listener and catch this event.
override def clone(): ExecutionListenerManager = writeLock { | ||
val newListenerManager = new ExecutionListenerManager | ||
listeners.foreach(newListenerManager.register) | ||
def clone(session: SparkSession): ExecutionListenerManager = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know why this method is public at the first place... I have to break it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add MiMa exclusion rule?
Test build #97122 has finished for PR 22674 at commit
|
Test build #97123 has finished for PR 22674 at commit
|
Test build #97140 has finished for PR 22674 at commit
|
Test build #97145 has finished for PR 22674 at commit
|
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good
@@ -75,95 +76,69 @@ trait QueryExecutionListener { | |||
*/ | |||
@Experimental | |||
@InterfaceStability.Evolving | |||
class ExecutionListenerManager private extends Logging { | |||
class ExecutionListenerManager private[sql](session: SparkSession, loadExtensions: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we shall add param comments.
Test build #97159 has finished for PR 22674 at commit
|
Test build #97161 has finished for PR 22674 at commit
|
|
||
private[sql] def this(conf: SparkConf) = { | ||
this() | ||
// The `session` is used to indicate which session carries this listener manager, and we only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this not a class doc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constructor is private, so we should not make it visible in the class doc
wl.unlock() | ||
} | ||
private def shouldCatchEvent(e: SparkListenerSQLExecutionEnd): Boolean = { | ||
// Only catch SQL execution with a name, and triggered by the same spark session that this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is what bugs me. You are adding separation between the SparkSession and its listeners, to undo that here. It seems like a bit of a hassle to go through because you basically need async execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea. Assuming we have many spark sessions, running queries at the same time. Each session sends query execution events to the central event bus, and sets up a listener to watch its own query execution events, asynchronously.
To make it work, the most straightforward way is to carry the session identifier in the events, and the listener only watch events with the expected session identifier.
Maybe a better way is to introduce session in the Spark core, so the listener framework can dispatch events w.r.t. session automatically. But that's a lot of work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we had the same problem in the StreamingQueryListener. You can check how we solved it in StreamExecution
. Since each SparkSession will have its own ExecutionListenerManager, you may be able to only have the proper ExecutionListenerManager deal with its own messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brkyvz thanks for the information! It seems the StreamingQueryListener
framework picks the same idea but the implementation is better. I'll update my PR accordingly.
val funcName = e.executionName.get | ||
e.executionFailure match { | ||
case Some(ex) => | ||
listeners.iterator().asScala.foreach(_.onFailure(funcName, e.qe, ex)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit of high level thought, you could consider making the calling event queue responsible for the dispatch of these events. That way you can leverage any improvement to the underlying event bus.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExecutionListenerManager
is already a listener, which is running in a separated thread, receiving events from LiveListenerBus
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a much larger change than I was expecting, but definitely a better one than which I had imagined. Left minor comments.
case e: Exception => | ||
sparkSession.listenerManager.onFailure(name, qe, e) | ||
throw e | ||
qe.executedPlan.foreach { plan => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this throw an exception? Imagine if df.count()
threw an exception, and then you run it again.
Won't this be a behavior change in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think resetMetrics
can throw exception...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't executedPlan throw an exception? I thought it can if the original spark plan failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see your point here
// can specify the execution name in more places in the future, so that | ||
// `QueryExecutionListener` can track more cases. | ||
event.executionName = name | ||
event.duration = endTime - startTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duration used to be reported in nanos. Now it's millis. I would still report it as nanos if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good catch!
Test build #97200 has finished for PR 22674 at commit
|
Test build #97203 has finished for PR 22674 at commit
|
retest this please |
Test build #97231 has finished for PR 22674 at commit
|
hmm, seems it failed at the same test. |
I couldn't reproduce it locally, let me try again |
retest this please |
I would just up the timeout in that suite. Now that we're pushing a bunch more stuff to the LiveListenerBus, it may not be draining quickly enough. On slow jenkins' it could likely cause flakiness. |
Test build #97251 has finished for PR 22674 at commit
|
Test build #97269 has finished for PR 22674 at commit
|
Test build #97277 has finished for PR 22674 at commit
|
Test build #97291 has finished for PR 22674 at commit
|
// The following 3 fields are only accessed when `executionName` is defined. | ||
|
||
// The duration of the SQL execution, in nanoseconds. | ||
@JsonIgnore private[sql] var duration: Long = 0L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you verify that the JsonIgnore annotation actually works? For some reason, I actually needed to annotate the class as
@JsonIgnoreProperties(Array("a", b", "c"))
class SomeClass {
@JsonProperty("a") val a: ...
@JsonProperty("b") val b: ...
}
the reason being Json4s understands that API better. I believe we use Json4s for all of these events
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a test to verify it: https://github.com/apache/spark/pull/22674/files#diff-6fa1d00d1cb20554dda238f2a3bc3ecbR55
I also used @JsonIgnoreProperties
before, when I put these fields in case class constructor. It seems we don't need @JsonIgnoreProperties
when they are private var
s.
Test build #97397 has finished for PR 22674 at commit
|
LGTM, do you have any other concerns @hvanhovell @brkyvz @dongjoon-hyun ? |
since there is no objection, I'm merging it to master, thanks! |
…on thread ## What changes were proposed in this pull request? The SQL execution listener framework was created from scratch(see apache#9078). It didn't leverage what we already have in the spark listener framework, and one major problem is, the listener runs on the spark execution thread, which means a bad listener can block spark's query processing. This PR re-implements the SQL execution listener framework. Now `ExecutionListenerManager` is just a normal spark listener, which watches the `SparkListenerSQLExecutionEnd` events and post events to the user-provided SQL execution listeners. ## How was this patch tested? existing tests. Closes apache#22674 from cloud-fan/listener. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
The SQL execution listener framework was created from scratch(see #9078). It didn't leverage what we already have in the spark listener framework, and one major problem is, the listener runs on the spark execution thread, which means a bad listener can block spark's query processing.
This PR re-implements the SQL execution listener framework. Now
ExecutionListenerManager
is just a normal spark listener, which watches theSparkListenerSQLExecutionEnd
events and post events to theuser-provided SQL execution listeners.
How was this patch tested?
existing tests.