Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13146][SQL] Management API for continuous queries #11030

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,84 @@

package org.apache.spark.sql

import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* A handle to a query that is executing continuously in the background as new data arrives.
* All these methods are thread-safe.
* @since 2.0.0
*/
@Experimental
trait ContinuousQuery {

/**
* Stops the execution of this query if it is running. This method blocks until the threads
* Returns the name of the query.
* @since 2.0.0
*/
def name: String

/**
* Returns the SQLContext associated with `this` query
* @since 2.0.0
*/
def sqlContext: SQLContext

/**
* Whether the query is currently active or not
* @since 2.0.0
*/
def isActive: Boolean

/**
* Returns the [[ContinuousQueryException]] if the query was terminated by an exception.
* @since 2.0.0
*/
def exception: Option[ContinuousQueryException]

/**
* Returns current status of all the sources.
* @since 2.0.0
*/
def sourceStatuses: Array[SourceStatus]

/** Returns current status of the sink. */
def sinkStatus: SinkStatus

/**
* Waits for the termination of `this` query, either by `query.stop()` or by an exception.
* If the query has terminated with an exception, then the exception will be thrown.
*
* If the query has terminated, then all subsequent calls to this method will either return
* immediately (if the query was terminated by `stop()`), or throw the exception
* immediately (if the query has terminated with exception).
*
* @throws ContinuousQueryException, if `this` query has terminated with an exception.
*
* @since 2.0.0
*/
def awaitTermination(): Unit

/**
* Waits for the termination of `this` query, either by `query.stop()` or by an exception.
* If the query has terminated with an exception, then the exception will be throw.
* Otherwise, it returns whether the query has terminated or not within the `timeoutMs`
* milliseconds.
*
* If the query has terminated, then all subsequent calls to this method will either return
* `true` immediately (if the query was terminated by `stop()`), or throw the exception
* immediately (if the query has terminated with exception).
*
* @throws ContinuousQueryException, if `this` query has terminated with an exception
*
* @since 2.0.0
*/
def awaitTermination(timeoutMs: Long): Boolean
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc for return value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.


/**
* Stops the execution of this query if it is running. This method blocks until the threads
* performing execution has stopped.
* @since 2.0.0
*/
def stop(): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}

/**
* :: Experimental ::
* Exception that stopped a [[ContinuousQuery]].
* @param query Query that caused the exception
* @param message Message of this exception
* @param cause Internal cause of this exception
* @param startOffset Starting offset (if known) of the range of data in which exception occurred
* @param endOffset Ending offset (if known) of the range of data in exception occurred
* @since 2.0.0
*/
@Experimental
class ContinuousQueryException private[sql](
val query: ContinuousQuery,
val message: String,
val cause: Throwable,
val startOffset: Option[Offset] = None,
val endOffset: Option[Offset] = None
) extends Exception(message, cause) {

/** Time when the exception occurred */
val time: Long = System.currentTimeMillis

override def toString(): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love this!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Danke!

val causeStr =
s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}"
s"""
|$causeStr
|
|${query.asInstanceOf[StreamExecution].toDebugString}
""".stripMargin
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import scala.collection.mutable

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.execution.streaming.{ContinuousQueryListenerBus, Sink, StreamExecution}
import org.apache.spark.sql.util.ContinuousQueryListener

/**
* :: Experimental ::
* A class to manage all the [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active
* on a [[SQLContext]].
*
* @since 2.0.0
*/
@Experimental
class ContinuousQueryManager(sqlContext: SQLContext) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marmbrus is it okay for this publicly visible API to be a class and not a trait?


private val listenerBus = new ContinuousQueryListenerBus(sqlContext.sparkContext.listenerBus)
private val activeQueries = new mutable.HashMap[String, ContinuousQuery]
private val activeQueriesLock = new Object
private val awaitTerminationLock = new Object

private var lastTerminatedQuery: ContinuousQuery = null

/**
* Returns a list of active queries associated with this SQLContext
*
* @since 2.0.0
*/
def active: Array[ContinuousQuery] = activeQueriesLock.synchronized {
activeQueries.values.toArray
}

/**
* Returns an active query from this SQLContext or throws exception if bad name
*
* @since 2.0.0
*/
def get(name: String): ContinuousQuery = activeQueriesLock.synchronized {
activeQueries.get(name).getOrElse {
throw new IllegalArgumentException(s"There is no active query with name $name")
}
}

/**
* Wait until any of the queries on the associated SQLContext has terminated since the
* creation of the context, or since `resetTerminated()` was called. If any query was terminated
* with an exception, then the exception will be thrown.
*
* If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either
* return immediately (if the query was terminated by `query.stop()`),
* or throw the exception immediately (if the query was terminated with exception). Use
* `resetTerminated()` to clear past terminations and wait for new terminations.
*
* In the case where multiple queries have terminated since `resetTermination()` was called,
* if any query has terminated with exception, then `awaitAnyTermination()` will
* throw any of the exception. For correctly documenting exceptions across multiple queries,
* users need to stop all of them after any of them terminates with exception, and then check the
* `query.exception()` for each query.
*
* @throws ContinuousQueryException, if any query has terminated with an exception
*
* @since 2.0.0
*/
def awaitAnyTermination(): Unit = {
awaitTerminationLock.synchronized {
while (lastTerminatedQuery == null) {
awaitTerminationLock.wait(10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the above comment about wait(10).

}
if (lastTerminatedQuery != null && lastTerminatedQuery.exception.nonEmpty) {
throw lastTerminatedQuery.exception.get
}
}
}

/**
* Wait until any of the queries on the associated SQLContext has terminated since the
* creation of the context, or since `resetTerminated()` was called. Returns whether any query
* has terminated or not (multiple may have terminated). If any query has terminated with an
* exception, then the exception will be thrown.
*
* If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either
* return `true` immediately (if the query was terminated by `query.stop()`),
* or throw the exception immediately (if the query was terminated with exception). Use
* `resetTerminated()` to clear past terminations and wait for new terminations.
*
* In the case where multiple queries have terminated since `resetTermination()` was called,
* if any query has terminated with exception, then `awaitAnyTermination()` will
* throw any of the exception. For correctly documenting exceptions across multiple queries,
* users need to stop all of them after any of them terminates with exception, and then check the
* `query.exception()` for each query.
*
* @throws ContinuousQueryException, if any query has terminated with an exception
*
* @since 2.0.0
*/
def awaitAnyTermination(timeoutMs: Long): Boolean = {

val startTime = System.currentTimeMillis
def isTimedout = System.currentTimeMillis - startTime >= timeoutMs

awaitTerminationLock.synchronized {
while (!isTimedout && lastTerminatedQuery == null) {
awaitTerminationLock.wait(10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use wait(10) to wake up every 10 milliseconds? Looks waste a lot of CPU cycles and unnecessary thread context switches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that if there is a race condition between one thread calling awaitTermination, and the sqlcontext being terminated, the waiting thread does not get stuff indefinitely waiting for the notifyAll to come

Also calling something this cheap 100 times a second is really cpu intensive. :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that if there is a race condition between one thread calling awaitTermination, and the sqlcontext being terminated, the waiting thread does not get stuff indefinitely waiting for the notifyAll to come

But awaitTerminationLock can guarantee no race condition like this. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I change the implementation to use CountDownLatch. Much simpler implementation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have not yet fixed this place.

}
if (lastTerminatedQuery != null && lastTerminatedQuery.exception.nonEmpty) {
throw lastTerminatedQuery.exception.get
}
lastTerminatedQuery != null
}
}

/**
* Forget about past terminated queries so that `awaitAnyTermination()` can be used again to
* wait for new terminations.
*
* @since 2.0.0
*/
def resetTerminated(): Unit = {
awaitTerminationLock.synchronized {
lastTerminatedQuery = null
}
}

/**
* Register a [[ContinuousQueryListener]] to receive up-calls for life cycle events of
* [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]].
*
* @since 2.0.0
*/
def addListener(listener: ContinuousQueryListener): Unit = {
listenerBus.addListener(listener)
}

/**
* Deregister a [[ContinuousQueryListener]].
*
* @since 2.0.0
*/
def removeListener(listener: ContinuousQueryListener): Unit = {
listenerBus.removeListener(listener)
}

/** Post a listener event */
private[sql] def postListenerEvent(event: ContinuousQueryListener.Event): Unit = {
listenerBus.post(event)
}

/** Start a query */
private[sql] def startQuery(name: String, df: DataFrame, sink: Sink): ContinuousQuery = {
activeQueriesLock.synchronized {
if (activeQueries.contains(name)) {
throw new IllegalArgumentException(
s"Cannot start query with name $name as a query with that name is already active")
}
val query = new StreamExecution(sqlContext, name, df.logicalPlan, sink)
query.start()
activeQueries.put(name, query)
query
}
}

/** Notify (by the ContinuousQuery) that the query has been terminated */
private[sql] def notifyQueryTermination(terminatedQuery: ContinuousQuery): Unit = {
activeQueriesLock.synchronized {
activeQueries -= terminatedQuery.name
}
awaitTerminationLock.synchronized {
if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) {
lastTerminatedQuery = terminatedQuery
}
awaitTerminationLock.notifyAll()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,17 @@ final class DataFrameWriter private[sql](df: DataFrame) {
df)
}

/**
* Specifies the name of the [[ContinuousQuery]] that can be started with `stream()`.
* This name must be unique among all the currently active queries in the associated SQLContext.
*
* @since 2.0.0
*/
def queryName(queryName: String): DataFrameWriter = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not tested yet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where should I put the test! There isnt any suite that tests the functionality of the DataFrameWriter.stream()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.extraOptions += ("queryName" -> queryName)
this
}

/**
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
Expand All @@ -230,7 +241,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
extraOptions.toMap,
normalizedParCols.getOrElse(Nil))

new StreamExecution(df.sqlContext, df.logicalPlan, sink)
df.sqlContext.continuousQueryManager.startQuery(
extraOptions.getOrElse("queryName", StreamExecution.nextName), df, sink)
}

/**
Expand Down
12 changes: 12 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ class SQLContext private[sql](
@transient
lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager

protected[sql] lazy val continuousQueryManager = new ContinuousQueryManager(this)

@transient
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf)

Expand Down Expand Up @@ -835,6 +837,16 @@ class SQLContext private[sql](
DataFrame(this, ShowTablesCommand(Some(databaseName)))
}

/**
* Returns a [[ContinuousQueryManager]] that allows managing all the
* [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active on `this` context.
*
* @since 2.0.0
*/
def streams: ContinuousQueryManager = {
continuousQueryManager
}

/**
* Returns the names of tables in the current database as an array.
*
Expand Down
Loading