New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-34087] Remove SparkSession from ExecutionListenerBus #31156
Conversation
ok to test. |
Kubernetes integration test starting |
Test build #134003 has finished for PR 31156 at commit
|
@@ -124,13 +131,27 @@ class ExecutionListenerManager private[sql](session: SparkSession, loadExtension | |||
} | |||
} | |||
|
|||
private[sql] class ExecutionListenerBus(session: SparkSession) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, ExecutionListenerBus
is created per SparkSession intentionally. With your change, it seems to break the assumption of "Only catch SQL execution with a name, and triggered by the same spark session that this listener manager belongs."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't break the assumption with this change. SQL executions will catch by the launched session listener manager because we only send the SparkListenerSQLExecutionStart
event to the launched session listener manager (see the change in the file SQLExecution.scala
) and process the SparkListenerSQLExecutionEnd
event with the same listener manager.
// queryExecution.executedPlan may throw an exception and made the send event message | ||
// failed. we should send SparkListenerSQLExecutionStart event to listener bus when we | ||
// catch an exception. | ||
val sparkPlanInfo = try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to fix a separate issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use SparkListenerSQLExecutionStart
event to add ExecutionListenerBus.activeQueryExecutionIds
here, if we don't send SparkListenerSQLExecutionStart
event messages when an exception throw, some tests will be failed.
Kubernetes integration test status success |
null | ||
} | ||
|
||
sparkSession.listenerManager.post(SparkListenerSQLExecutionStart( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We send SparkListenerSQLExecutionStart
event through sparkSession.listenerManager.post
, this method will only send SQL executions event to the listener manager created by the SQL launched session, not all of the listener managers.
What changes were proposed in this pull request?
We should not reference sparkSession in
org.apache.spark.sql.util.ExecutionListenerBus
, otherwise, JVM can't collect expired sessions and make a memory leak.How was this patch tested?
manually tested.