Skip to content

Commit

Permalink
[SPARK-15686][SQL] Move user-facing streaming classes into sql.streaming
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
This patch moves all user-facing structured streaming classes into sql.streaming. As part of this, I also added some since version annotation to methods and classes that don't have them.

## How was this patch tested?
Updated tests to reflect the moves.

Author: Reynold Xin <rxin@databricks.com>

Closes #13429 from rxin/SPARK-15686.
  • Loading branch information
rxin authored and marmbrus committed Jun 1, 2016
1 parent d5012c2 commit a71d136
Show file tree
Hide file tree
Showing 42 changed files with 121 additions and 74 deletions.
3 changes: 2 additions & 1 deletion python/pyspark/sql/streaming.py
Expand Up @@ -201,7 +201,8 @@ def __init__(self, interval):
self.interval = interval

def _to_java_trigger(self, sqlContext):
return sqlContext._sc._jvm.org.apache.spark.sql.ProcessingTime.create(self.interval)
return sqlContext._sc._jvm.org.apache.spark.sql.streaming.ProcessingTime.create(
self.interval)


def _test():
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/utils.py
Expand Up @@ -71,7 +71,7 @@ def deco(*a, **kw):
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
raise ParseException(s.split(': ', 1)[1], stackTrace)
if s.startswith('org.apache.spark.sql.ContinuousQueryException: '):
if s.startswith('org.apache.spark.sql.streaming.ContinuousQueryException: '):
raise ContinuousQueryException(s.split(': ', 1)[1], stackTrace)
if s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '):
raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
Expand Down
Expand Up @@ -15,9 +15,10 @@
* limitations under the License.
*/

package org.apache.spark.sql;
package org.apache.spark.sql.streaming;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.InternalOutputModes;

/**
* :: Experimental ::
Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.sql.streaming.OutputMode

/**
* Internal helper class to generate objects representing various [[OutputMode]]s,
*/
Expand Down
Expand Up @@ -17,9 +17,10 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.{AnalysisException, InternalOutputModes, OutputMode}
import org.apache.spark.sql.{AnalysisException, InternalOutputModes}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.streaming.OutputMode

/**
* Analyzes the presence of unsupported operations in a logical plan.
Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql;
package org.apache.spark.sql.streaming;

import org.junit.Test;

Expand Down
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{AnalysisException, OutputMode}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.InternalOutputModes._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
Expand All @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference,
import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.IntegerType

/** A dummy command for testing unsupported operations. */
Expand Down
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingA
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, StreamExecution}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.Utils

/**
Expand Down
1 change: 1 addition & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Expand Up @@ -49,6 +49,7 @@ import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.ContinuousQuery
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
Expand Down
6 changes: 2 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Expand Up @@ -30,13 +30,11 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.streaming.ContinuousQueryManager
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ExecutionListenerManager

Expand Down Expand Up @@ -645,7 +643,7 @@ class SQLContext private[sql](val sparkSession: SparkSession)

/**
* Returns a [[ContinuousQueryManager]] that allows managing all the
* [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active on `this` context.
* [[org.apache.spark.sql.streaming.ContinuousQuery ContinuousQueries]] active on `this` context.
*
* @since 2.0.0
*/
Expand Down
Expand Up @@ -25,7 +25,7 @@ import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal

import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
Expand All @@ -40,8 +40,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState, SQLConf}
import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{DataType, LongType, StructType}
import org.apache.spark.sql.util.ExecutionListenerManager
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -182,7 +183,7 @@ class SparkSession private(
/**
* :: Experimental ::
* Returns a [[ContinuousQueryManager]] that allows managing all the
* [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active on `this`.
* [[ContinuousQuery ContinuousQueries]] active on `this`.
*
* @group basic
* @since 2.0.0
Expand Down
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution.streaming.MemoryPlan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.ContinuousQuery

private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SparkPlanner =>
Expand Down Expand Up @@ -201,7 +202,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

/**
* Used to plan aggregation queries that are computed incrementally as part of a
* [[org.apache.spark.sql.ContinuousQuery]]. Currently this rule is injected into the planner
* [[ContinuousQuery]]. Currently this rule is injected into the planner
* on-demand, only when planning in a [[org.apache.spark.sql.execution.streaming.StreamExecution]]
*/
object StatefulAggregationStrategy extends Strategy {
Expand Down
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
import org.apache.spark.util.Utils

Expand Down
Expand Up @@ -18,8 +18,7 @@
package org.apache.spark.sql.execution.streaming

import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
import org.apache.spark.sql.util.ContinuousQueryListener
import org.apache.spark.sql.util.ContinuousQueryListener._
import org.apache.spark.sql.streaming.ContinuousQueryListener
import org.apache.spark.util.ListenerBus

/**
Expand All @@ -30,7 +29,10 @@ import org.apache.spark.util.ListenerBus
* dispatch them to ContinuousQueryListener.
*/
class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
extends SparkListener with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] {
extends SparkListener
with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] {

import ContinuousQueryListener._

sparkListenerBus.addListener(this)

Expand Down Expand Up @@ -74,7 +76,8 @@ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
* listener bus.
*/
private case class WrappedContinuousQueryListenerEvent(
streamingListenerEvent: ContinuousQueryListener.Event) extends SparkListenerEvent {
streamingListenerEvent: ContinuousQueryListener.Event)
extends SparkListenerEvent {

// Do not log streaming events in event log as history server does not support these events.
protected[spark] override def logEvent: Boolean = false
Expand Down
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.{InternalOutputModes, OutputMode, SparkSession}
import org.apache.spark.sql.{InternalOutputModes, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode}
import org.apache.spark.sql.streaming.OutputMode

/**
* A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]]
Expand Down
Expand Up @@ -33,8 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.ContinuousQueryListener
import org.apache.spark.sql.util.ContinuousQueryListener._
import org.apache.spark.sql.streaming._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}

/**
Expand All @@ -54,6 +53,8 @@ class StreamExecution(
val outputMode: OutputMode)
extends ContinuousQuery with Logging {

import org.apache.spark.sql.streaming.ContinuousQueryListener._

/**
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
*/
Expand Down
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.streaming

import org.apache.spark.internal.Logging
import org.apache.spark.sql.ProcessingTime
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.util.{Clock, SystemClock}

trait TriggerExecutor {
Expand Down
Expand Up @@ -18,8 +18,9 @@
package org.apache.spark.sql.execution.streaming

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, OutputMode, SQLContext}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode

class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
// Number of rows to display, by default 20 rows
Expand Down
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

object MemoryStream {
Expand Down
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.AnalyzeTableCommand
import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, FindDataSourceTable, PreInsertCastAndRename, ResolveDataSource}
import org.apache.spark.sql.streaming.{ContinuousQuery, ContinuousQueryManager}
import org.apache.spark.sql.util.ExecutionListenerManager


Expand Down Expand Up @@ -142,7 +143,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager

/**
* Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s.
* Interface to start and stop [[ContinuousQuery]]s.
*/
lazy val continuousQueryManager: ContinuousQueryManager = {
new ContinuousQueryManager(sparkSession)
Expand Down
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

/**
Expand Down
Expand Up @@ -15,9 +15,10 @@
* limitations under the License.
*/

package org.apache.spark.sql
package org.apache.spark.sql.streaming

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.SparkSession

/**
* :: Experimental ::
Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql
package org.apache.spark.sql.streaming

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}
Expand Down
Expand Up @@ -15,27 +15,30 @@
* limitations under the License.
*/

package org.apache.spark.sql.util
package org.apache.spark.sql.streaming

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.ContinuousQuery
import org.apache.spark.sql.util.ContinuousQueryListener._

/**
* :: Experimental ::
* Interface for listening to events related to [[ContinuousQuery ContinuousQueries]].
* @note The methods are not thread-safe as they may be called from different threads.
*
* @since 2.0.0
*/
@Experimental
abstract class ContinuousQueryListener {

import ContinuousQueryListener._

/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.DataFrameWriter `DataFrameWriter.startStream()`]],
* that is, `onQueryStart` will be called on all listeners before
* `DataFrameWriter.startStream()` returns the corresponding [[ContinuousQuery]]. Please
* don't block this method as it will block your query.
* @since 2.0.0
*/
def onQueryStarted(queryStarted: QueryStarted): Unit

Expand All @@ -46,30 +49,47 @@ abstract class ContinuousQueryListener {
* latest no matter when this method is called. Therefore, the status of [[ContinuousQuery]]
* may be changed before/when you process the event. E.g., you may find [[ContinuousQuery]]
* is terminated when you are processing [[QueryProgress]].
* @since 2.0.0
*/
def onQueryProgress(queryProgress: QueryProgress): Unit

/** Called when a query is stopped, with or without error */
/**
* Called when a query is stopped, with or without error.
* @since 2.0.0
*/
def onQueryTerminated(queryTerminated: QueryTerminated): Unit
}


/**
* :: Experimental ::
* Companion object of [[ContinuousQueryListener]] that defines the listener events.
* @since 2.0.0
*/
@Experimental
object ContinuousQueryListener {

/** Base type of [[ContinuousQueryListener]] events */
/**
* Base type of [[ContinuousQueryListener]] events.
* @since 2.0.0
*/
trait Event

/** Event representing the start of a query */
/**
* Event representing the start of a query.
* @since 2.0.0
*/
class QueryStarted private[sql](val query: ContinuousQuery) extends Event

/** Event representing any progress updates in a query */
/**
* Event representing any progress updates in a query.
* @since 2.0.0
*/
class QueryProgress private[sql](val query: ContinuousQuery) extends Event

/** Event representing that termination of a query */
/**
* Event representing that termination of a query.
* @since 2.0.0
*/
class QueryTerminated private[sql](val query: ContinuousQuery) extends Event
}

0 comments on commit a71d136

Please sign in to comment.