Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-15580][SQL]Add ContinuousQueryInfo to make ContinuousQueryListener events serializable #13335

Closed
wants to merge 13 commits into from
Expand Up @@ -22,15 +22,13 @@ import org.apache.spark.sql.streaming.ContinuousQueryListener
import org.apache.spark.util.ListenerBus

/**
* A bus to forward events to [[ContinuousQueryListener]]s. This one will wrap received
* [[ContinuousQueryListener.Event]]s as WrappedContinuousQueryListenerEvents and send them to the
* Spark listener bus. It also registers itself with Spark listener bus, so that it can receive
* WrappedContinuousQueryListenerEvents, unwrap them as ContinuousQueryListener.Events and
* dispatch them to ContinuousQueryListener.
* A bus to forward events to [[ContinuousQueryListener]]s. This one will send received
* [[ContinuousQueryListener.Event]]s to the Spark listener bus. It also registers itself with
* Spark listener bus, so that it can receive [[ContinuousQueryListener.Event]]s and dispatch them
* to ContinuousQueryListener.
*/
class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
extends SparkListener
with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] {
extends SparkListener with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] {

import ContinuousQueryListener._

Expand All @@ -45,13 +43,13 @@ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
case s: QueryStarted =>
postToAll(s)
case _ =>
sparkListenerBus.post(new WrappedContinuousQueryListenerEvent(event))
sparkListenerBus.post(event)
}
}

override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case WrappedContinuousQueryListenerEvent(e) =>
case e: ContinuousQueryListener.Event =>
postToAll(e)
case _ =>
}
Expand All @@ -71,15 +69,4 @@ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
}
}

/**
* Wrapper for StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark
* listener bus.
*/
private case class WrappedContinuousQueryListenerEvent(
streamingListenerEvent: ContinuousQueryListener.Event)
extends SparkListenerEvent {

// Do not log streaming events in event log as history server does not support these events.
protected[spark] override def logEvent: Boolean = false
}
}
Expand Up @@ -131,12 +131,13 @@ class StreamExecution(
/** Returns current status of all the sources. */
override def sourceStatuses: Array[SourceStatus] = {
val localAvailableOffsets = availableOffsets
sources.map(s => new SourceStatus(s.toString, localAvailableOffsets.get(s))).toArray
sources.map(s =>
new SourceStatus(s.toString, localAvailableOffsets.get(s).map(_.toString))).toArray
}

/** Returns current status of the sink. */
override def sinkStatus: SinkStatus =
new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources))
new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources).toString)

/** Returns the [[ContinuousQueryException]] if the query was terminated by an exception. */
override def exception: Option[ContinuousQueryException] = Option(streamDeathCause)
Expand Down Expand Up @@ -167,7 +168,7 @@ class StreamExecution(
// Mark ACTIVE and then post the event. QueryStarted event is synchronously sent to listeners,
// so must mark this as ACTIVE first.
state = ACTIVE
postEvent(new QueryStarted(this)) // Assumption: Does not throw exception.
postEvent(new QueryStarted(this.toInfo)) // Assumption: Does not throw exception.

// Unblock starting thread
startLatch.countDown()
Expand Down Expand Up @@ -206,7 +207,10 @@ class StreamExecution(
} finally {
state = TERMINATED
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
postEvent(new QueryTerminated(this))
postEvent(new QueryTerminated(
this.toInfo,
exception.map(_.getMessage),
exception.map(_.getStackTrace.toSeq).getOrElse(Nil)))
terminationLatch.countDown()
}
}
Expand Down Expand Up @@ -374,7 +378,7 @@ class StreamExecution(
logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
// Update committed offsets.
committedOffsets ++= availableOffsets
postEvent(new QueryProgress(this))
postEvent(new QueryProgress(this.toInfo))
}

private def postEvent(event: ContinuousQueryListener.Event) {
Expand Down Expand Up @@ -484,6 +488,13 @@ class StreamExecution(
""".stripMargin
}

private def toInfo: ContinuousQueryInfo = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make this a deep copy, especially for sourceStatuses and sinkStatus ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. They are immutable.

new ContinuousQueryInfo(
this.name,
this.sourceStatuses,
this.sinkStatus)
}

trait State
case object INITIALIZED extends State
case object ACTIVE extends State
Expand Down
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.streaming

import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* A class used to report information about the progress of a [[ContinuousQuery]].
*
* @param name The [[ContinuousQuery]] name.
* @param sourceStatuses The current statuses of the [[ContinuousQuery]]'s sources.
* @param sinkStatus The current status of the [[ContinuousQuery]]'s sink.
*/
@Experimental
class ContinuousQueryInfo private[sql](
val name: String,
val sourceStatuses: Seq[SourceStatus],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also probably be an Array. No need to be a Seq.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offline discussion: this is okay. this is how it is for SparkListener, and avoid problems like the array being accidentally modified internally by the listener generator.

val sinkStatus: SinkStatus)
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming

import org.apache.spark.annotation.Experimental
import org.apache.spark.scheduler.SparkListenerEvent

/**
* :: Experimental ::
Expand Down Expand Up @@ -70,26 +71,43 @@ abstract class ContinuousQueryListener {
object ContinuousQueryListener {

/**
* Base type of [[ContinuousQueryListener]] events.
* :: Experimental ::
* Base type of [[ContinuousQueryListener]] events
* @since 2.0.0
*/
trait Event
@Experimental
trait Event extends SparkListenerEvent

/**
* Event representing the start of a query.
* :: Experimental ::
* Event representing the start of a query
* @since 2.0.0
*/
class QueryStarted private[sql](val query: ContinuousQuery) extends Event
@Experimental
class QueryStarted private[sql](val queryInfo: ContinuousQueryInfo) extends Event
Copy link
Contributor

@tdas tdas Jun 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldnt these also be marked with Experimental?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/**
* Event representing any progress updates in a query.
* :: Experimental ::
* Event representing any progress updates in a query
* @since 2.0.0
*/
class QueryProgress private[sql](val query: ContinuousQuery) extends Event
@Experimental
class QueryProgress private[sql](val queryInfo: ContinuousQueryInfo) extends Event

/**
* Event representing that termination of a query.
* :: Experimental ::
* Event representing that termination of a query
*
* @param queryInfo Information about the status of the query.
* @param exception The exception message of the [[ContinuousQuery]] if the query was terminated
* with an exception. Otherwise, it will be `None`.
* @param stackTrace The stack trace of the exception if the query was terminated with an
* exception. It will be empty if there was no error.
* @since 2.0.0
*/
class QueryTerminated private[sql](val query: ContinuousQuery) extends Event
@Experimental
class QueryTerminated private[sql](
val queryInfo: ContinuousQueryInfo,
val exception: Option[String],
val stackTrace: Seq[StackTraceElement]) extends Event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isnt it more Java friendly to make this Array instead of Seq? It does not need to be Seq.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to expose a mutable Array to the user. Secondly, this one is not Java friendly already since it uses Option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What mutable about this Array? And for Options, there is no workaround. And Option is pretty simple to deal with in Java. Seq is more annoying, need to look up Scala doc etc.?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g., the user can change the content of an array.

}
Expand Up @@ -18,17 +18,17 @@
package org.apache.spark.sql.streaming

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.execution.streaming.{Offset, Sink}
import org.apache.spark.sql.execution.streaming.Sink

/**
* :: Experimental ::
* Status and metrics of a streaming [[Sink]].
*
* @param description Description of the source corresponding to this status
* @param offset Current offset up to which data has been written by the sink
* @param offsetDesc Description of the current offset up to which data has been written by the sink
* @since 2.0.0
*/
@Experimental
class SinkStatus private[sql](
val description: String,
val offset: Offset)
val offsetDesc: String)
Expand Up @@ -18,17 +18,17 @@
package org.apache.spark.sql.streaming

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.execution.streaming.{Offset, Source}
import org.apache.spark.sql.execution.streaming.Source

/**
* :: Experimental ::
* Status and metrics of a streaming [[Source]].
*
* @param description Description of the source corresponding to this status
* @param offset Current offset of the source, if known
* @param description Description of the source corresponding to this status
* @param offsetDesc Description of the current [[Source]] offset if known
* @since 2.0.0
*/
@Experimental
class SourceStatus private[sql] (
val description: String,
val offset: Option[Offset])
val offsetDesc: Option[String])