Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-15580][SQL]Add ContinuousQueryInfo to make ContinuousQueryListener events serializable #13335

Closed
wants to merge 13 commits into from
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.ContinuousQueryListener
import org.apache.spark.sql.util.{ContinuousQueryInfo, ContinuousQueryListener}
import org.apache.spark.sql.util.ContinuousQueryListener._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}

Expand Down Expand Up @@ -167,7 +167,7 @@ class StreamExecution(
// Mark ACTIVE and then post the event. QueryStarted event is synchronously sent to listeners,
// so must mark this as ACTIVE first.
state = ACTIVE
postEvent(new QueryStarted(this)) // Assumption: Does not throw exception.
postEvent(new QueryStarted(this.toInfo)) // Assumption: Does not throw exception.

// Unblock starting thread
startLatch.countDown()
Expand Down Expand Up @@ -206,7 +206,7 @@ class StreamExecution(
} finally {
state = TERMINATED
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
postEvent(new QueryTerminated(this))
postEvent(new QueryTerminated(this.toInfo))
terminationLatch.countDown()
}
}
Expand Down Expand Up @@ -374,7 +374,7 @@ class StreamExecution(
logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
// Update committed offsets.
committedOffsets ++= availableOffsets
postEvent(new QueryProgress(this))
postEvent(new QueryProgress(this.toInfo))
}

private def postEvent(event: ContinuousQueryListener.Event) {
Expand Down Expand Up @@ -484,6 +484,15 @@ class StreamExecution(
""".stripMargin
}

private def toInfo: ContinuousQueryInfo = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make this a deep copy, especially for sourceStatuses and sinkStatus ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. They are immutable.

ContinuousQueryInfo(
this.name,
this.isActive,
this.sourceStatuses,
this.sinkStatus,
this.exception)
}

trait State
case object INITIALIZED extends State
case object ACTIVE extends State
Expand Down
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.util

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.ContinuousQuery
import org.apache.spark.sql._
import org.apache.spark.sql.util.ContinuousQueryListener._

/**
Expand Down Expand Up @@ -65,11 +65,23 @@ object ContinuousQueryListener {
trait Event

/** Event representing the start of a query */
class QueryStarted private[sql](val query: ContinuousQuery) extends Event
class QueryStarted private[sql](val queryInfo: ContinuousQueryInfo) extends Event

/** Event representing any progress updates in a query */
class QueryProgress private[sql](val query: ContinuousQuery) extends Event
class QueryProgress private[sql](val queryInfo: ContinuousQueryInfo) extends Event

/** Event representing that termination of a query */
class QueryTerminated private[sql](val query: ContinuousQuery) extends Event
class QueryTerminated private[sql](val queryInfo: ContinuousQueryInfo) extends Event
Copy link
Contributor

@marmbrus marmbrus May 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the data just be in the different events? Its a little odd that Progress includes and exception and Terminated includes isActive. We should consider what is the minimal amount of data we can expose here such that you can do all the useful things we need to.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed isActive and moved exception to QueryTerminated.

}

/**
* :: Experimental ::
* A class that contains information about [[ContinuousQuery]].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A class used to report information about the progress of a [[ContinuousQuery]].

*/
@Experimental
case class ContinuousQueryInfo(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should scaladoc these parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also probably don't want this to be a case class. The issue is that if we add new parameters in the future, the unapply method will break binary compatibility. The constructor should also be private.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed case.

name: String,
isActive: Boolean,
sourceStatuses: Seq[SourceStatus],
sinkStatus: SinkStatus,
exception: Option[ContinuousQueryException])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ContinuousQueryException really safe to serialize with the default serializer? Does jackson obey transient?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. We should use string instead of ContinuousQueryException.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though the stacktrace is also very useful. That might be safe to serialize.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced it with String

Expand Up @@ -52,7 +52,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
Assert("Incorrect query status in onQueryStarted") {
val status = listener.startStatus
assert(status != null)
assert(status.active == true)
assert(status.isActive == true)
assert(status.sourceStatuses.size === 1)
assert(status.sourceStatuses(0).description.contains("Memory"))

Expand All @@ -74,7 +74,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
assert(listener.progressStatuses.size === 1)
val status = listener.progressStatuses.peek()
assert(status != null)
assert(status.active == true)
assert(status.isActive == true)
assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))

Expand All @@ -88,7 +88,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
val status = listener.terminationStatus
assert(status != null)

assert(status.active === false) // must be inactive by the time onQueryTerm is called
assert(status.isActive === false) // must be inactive by the time onQueryTerm is called
assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))
}
Expand Down Expand Up @@ -165,9 +165,9 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
// to catch errors in the async listener events
@volatile private var asyncTestWaiter = new Waiter

@volatile var startStatus: QueryStatus = null
@volatile var terminationStatus: QueryStatus = null
val progressStatuses = new ConcurrentLinkedQueue[QueryStatus]
@volatile var startStatus: ContinuousQueryInfo = null
@volatile var terminationStatus: ContinuousQueryInfo = null
val progressStatuses = new ConcurrentLinkedQueue[ContinuousQueryInfo]

def reset(): Unit = {
startStatus = null
Expand All @@ -183,35 +183,23 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with

override def onQueryStarted(queryStarted: QueryStarted): Unit = {
asyncTestWaiter {
startStatus = QueryStatus(queryStarted.query)
startStatus = queryStarted.queryInfo
}
}

override def onQueryProgress(queryProgress: QueryProgress): Unit = {
asyncTestWaiter {
assert(startStatus != null, "onQueryProgress called before onQueryStarted")
progressStatuses.add(QueryStatus(queryProgress.query))
progressStatuses.add(queryProgress.queryInfo)
}
}

override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
asyncTestWaiter {
assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
terminationStatus = QueryStatus(queryTerminated.query)
terminationStatus = queryTerminated.queryInfo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception is not tested.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for the exception

}
asyncTestWaiter.dismiss()
}
}

case class QueryStatus(
active: Boolean,
exception: Option[Exception],
sourceStatuses: Array[SourceStatus],
sinkStatus: SinkStatus)

object QueryStatus {
def apply(query: ContinuousQuery): QueryStatus = {
QueryStatus(query.isActive, query.exception, query.sourceStatuses, query.sinkStatus)
}
}
}