New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-15580][SQL]Add ContinuousQueryInfo to make ContinuousQueryListener events serializable #13335
Conversation
Test build #59415 has finished for PR 13335 at commit
|
@@ -484,6 +484,15 @@ class StreamExecution( | |||
""".stripMargin | |||
} | |||
|
|||
private def toInfo: ContinuousQueryInfo = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to make this a deep copy, especially for sourceStatuses
and sinkStatus
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. They are immutable.
* A class that contains information about [[ContinuousQuery]]. | ||
*/ | ||
@Experimental | ||
case class ContinuousQueryInfo( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should scaladoc these parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also probably don't want this to be a case class
. The issue is that if we add new parameters in the future, the unapply
method will break binary compatibility. The constructor should also be private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed case
.
Test build #59502 has finished for PR 13335 at commit
|
} | ||
} | ||
|
||
override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { | ||
asyncTestWaiter { | ||
assert(startStatus != null, "onQueryTerminated called before onQueryStarted") | ||
terminationStatus = QueryStatus(queryTerminated.query) | ||
terminationStatus = queryTerminated.queryInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception is not tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a test for the exception
Can you include the json produced as a sanity check? |
Test build #59514 has finished for PR 13335 at commit
|
I reverted c24b6b6 as there is a Option serialization bug in jackson-module-scala 2.7.3 (FasterXML/jackson-module-scala#240) |
@@ -22,7 +22,7 @@ package org.apache.spark.sql.execution.streaming | |||
* [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance | |||
* vector clock that must progress linearly forward. | |||
*/ | |||
case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { | |||
case class CompositeOffset(offsets: Array[Option[Offset]]) extends Offset { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change the type to Array
as jackson-module-scala doesn't support Seq[Option[X]]
. Probably because of the type erasure.
Test build #59608 has finished for PR 13335 at commit
|
Removed this one and put it in #13417. Will update this one once #13417 gets merged |
Test build #59674 has finished for PR 13335 at commit
|
Test build #59685 has finished for PR 13335 at commit
|
* Wrapper for StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark | ||
* listener bus. | ||
*/ | ||
case class WrappedContinuousQueryListenerEvent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move it out of ContinuousQueryListenerBus
in order to eliminate the reference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't actually understand the structure of all the different even busses, but why do we need a different event type? Why don't all the events just extend SparkListenerEvent
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. This pattern is from the Streaming events. But that's for binary compatibility. We don't need this pattern in the new APIs.
@marmbrus could you take another look, please? |
@marmbrus addressed |
Test build #59752 has finished for PR 13335 at commit
|
Test build #59758 has finished for PR 13335 at commit
|
Test build #59760 has finished for PR 13335 at commit
|
Test build #59771 has finished for PR 13335 at commit
|
@@ -31,4 +31,4 @@ import org.apache.spark.sql.execution.streaming.{Offset, Source} | |||
@Experimental | |||
class SourceStatus private[sql] ( | |||
val description: String, | |||
val offset: Option[Offset]) | |||
val offset: Option[String]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offset --> offsetDesc, so that it allows us in future to expose actual offsets without the naming being weird.
@tdas addressed |
@Experimental | ||
class ContinuousQueryInfo private[sql]( | ||
val name: String, | ||
val sourceStatuses: Seq[SourceStatus], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should also probably be an Array. No need to be a Seq.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Offline discussion: this is okay. this is how it is for SparkListener, and avoid problems like the array being accidentally modified internally by the listener generator.
LGTM. Will merge after tests pass. |
Test build #60069 has finished for PR 13335 at commit
|
Test build #60072 has finished for PR 13335 at commit
|
Merging this to master and 2.0. Thanks @zsxwing |
…tener events serializable ## What changes were proposed in this pull request? This PR adds ContinuousQueryInfo to make ContinuousQueryListener events serializable in order to support writing events into the event log. ## How was this patch tested? Jenkins unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13335 from zsxwing/query-info. (cherry picked from commit 0cfd619) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
What changes were proposed in this pull request?
This PR adds ContinuousQueryInfo to make ContinuousQueryListener events serializable in order to support writing events into the event log.
How was this patch tested?
Jenkins unit tests.