-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18144][SQL] logging StreamingQueryListener$QueryStartedEvent #15675
Conversation
@@ -39,12 +39,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) | |||
* be dispatched to all StreamingQueryListener in the thread of the Spark listener bus. | |||
*/ | |||
def post(event: StreamingQueryListener.Event) { | |||
event match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually I didn't see any reason to distinguish these two types of event when posting
Test build #67722 has finished for PR 15675 at commit
|
It's intentional. Just to provide a synchronous callback for the |
Ideally, it should not use the listener api for a synchronous callback. But right now I don't have any good API design for this one. |
Sorry, I misunderstood the real reason of the bug The real reason is actually very simple...the original postToAll() is actually calling |
Test build #67737 has finished for PR 15675 at commit
|
@zsxwing mind looking at it again? |
@@ -41,7 +41,9 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) | |||
def post(event: StreamingQueryListener.Event) { | |||
event match { | |||
case s: QueryStartedEvent => | |||
postToAll(s) | |||
sparkListenerBus.postToAll(s) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may break the existing spark listeners because they are not thread-safe.
@@ -41,6 +41,8 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) | |||
def post(event: StreamingQueryListener.Event) { | |||
event match { | |||
case s: QueryStartedEvent => | |||
sparkListenerBus.post(s) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QueryStartedEvent
will be sent twice to StreamingQueryListener
. There is a thread local variable in LiveListenerBus
. You can use it to ignore duplicated QueryStartedEvent
if it's posted in the listener thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is hacky, it's better to have some tests to make sure we won't break things in future when removing the hacky codes.
Test build #67833 has finished for PR 15675 at commit
|
Test build #67851 has finished for PR 15675 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Just some nits
// synchronously and to listeners attached to LiveListenerBus asynchronously. Therefore, | ||
// we need to ignore QueryStartedEvent if this method is called within SparkListenerBus | ||
// thread | ||
if (Thread.currentThread().getName != "SparkListenerBus" || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please use !LiveListenerBus.withinListenerThread.value
instead.
@@ -290,7 +290,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { | |||
// A StreamingQueryListener that gets the query status after the first completed trigger | |||
val listener = new StreamingQueryListener { | |||
@volatile var firstStatus: StreamingQueryStatus = null | |||
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please add @volatile
Test build #67942 has finished for PR 15675 at commit
|
LGTM. Thanks! Merging to master, 2.1 and 2.0. |
## What changes were proposed in this pull request? The PR fixes the bug that the QueryStartedEvent is not logged the postToAll() in the original code is actually calling StreamingQueryListenerBus.postToAll() which has no listener at all....we shall post by sparkListenerBus.postToAll(s) and this.postToAll() to trigger local listeners as well as the listeners registered in LiveListenerBus zsxwing ## How was this patch tested? The following snapshot shows that QueryStartedEvent has been logged correctly ![image](https://cloud.githubusercontent.com/assets/678008/19821553/007a7d28-9d2d-11e6-9f13-49851559cdaa.png) Author: CodingCat <zhunansjtu@gmail.com> Closes #15675 from CodingCat/SPARK-18144. (cherry picked from commit 85c5424) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
## What changes were proposed in this pull request? The PR fixes the bug that the QueryStartedEvent is not logged the postToAll() in the original code is actually calling StreamingQueryListenerBus.postToAll() which has no listener at all....we shall post by sparkListenerBus.postToAll(s) and this.postToAll() to trigger local listeners as well as the listeners registered in LiveListenerBus zsxwing ## How was this patch tested? The following snapshot shows that QueryStartedEvent has been logged correctly ![image](https://cloud.githubusercontent.com/assets/678008/19821553/007a7d28-9d2d-11e6-9f13-49851559cdaa.png) Author: CodingCat <zhunansjtu@gmail.com> Closes #15675 from CodingCat/SPARK-18144. (cherry picked from commit 85c5424) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
thanks |
## What changes were proposed in this pull request? The PR fixes the bug that the QueryStartedEvent is not logged the postToAll() in the original code is actually calling StreamingQueryListenerBus.postToAll() which has no listener at all....we shall post by sparkListenerBus.postToAll(s) and this.postToAll() to trigger local listeners as well as the listeners registered in LiveListenerBus zsxwing ## How was this patch tested? The following snapshot shows that QueryStartedEvent has been logged correctly ![image](https://cloud.githubusercontent.com/assets/678008/19821553/007a7d28-9d2d-11e6-9f13-49851559cdaa.png) Author: CodingCat <zhunansjtu@gmail.com> Closes apache#15675 from CodingCat/SPARK-18144.
What changes were proposed in this pull request?
The PR fixes the bug that the QueryStartedEvent is not logged
the postToAll() in the original code is actually calling StreamingQueryListenerBus.postToAll() which has no listener at all....we shall post by sparkListenerBus.postToAll(s) and this.postToAll() to trigger local listeners as well as the listeners registered in LiveListenerBus
@zsxwing
How was this patch tested?
The following snapshot shows that QueryStartedEvent has been logged correctly