Skip to content

Commit

Permalink
[SPARK-29543][SS][FOLLOWUP] Move spark.sql.streaming.ui.* configs t…
Browse files Browse the repository at this point in the history
…o StaticSQLConf

### What changes were proposed in this pull request?
Put the configs below needed by Structured Streaming UI into StaticSQLConf:

- spark.sql.streaming.ui.enabled
- spark.sql.streaming.ui.retainedProgressUpdates
- spark.sql.streaming.ui.retainedQueries

### Why are the changes needed?
Make all SS UI configs consistent with other similar configs in usage and naming.

### Does this PR introduce any user-facing change?
Yes, add new static config `spark.sql.streaming.ui.retainedProgressUpdates`.

### How was this patch tested?
Existing UT.

Closes #27425 from xuanyuanking/SPARK-29543-follow.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
(cherry picked from commit a4912ce)
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
  • Loading branch information
xuanyuanking authored and zsxwing committed Feb 3, 2020
1 parent 91f78ae commit f9b8637
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1150,18 +1150,6 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val STREAMING_UI_ENABLED =
buildConf("spark.sql.streaming.ui.enabled")
.doc("Whether to run the structured streaming UI for the Spark application.")
.booleanConf
.createWithDefault(true)

val STREAMING_UI_INACTIVE_QUERY_RETENTION =
buildConf("spark.sql.streaming.ui.numInactiveQueries")
.doc("The number of inactive queries to retain for structured streaming ui.")
.intConf
.createWithDefault(100)

val VARIABLE_SUBSTITUTE_ENABLED =
buildConf("spark.sql.variable.substitute")
.doc("This enables substitution using syntax like ${var} ${system:var} and ${env:var}.")
Expand Down Expand Up @@ -2284,10 +2272,6 @@ class SQLConf extends Serializable with Logging {

def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)

def isStreamingUIEnabled: Boolean = getConf(STREAMING_UI_ENABLED)

def streamingUIInactiveQueryRetention: Int = getConf(STREAMING_UI_INACTIVE_QUERY_RETENTION)

def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)

def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,24 @@ object StaticSQLConf {
.internal()
.booleanConf
.createWithDefault(true)

val STREAMING_UI_ENABLED =
buildStaticConf("spark.sql.streaming.ui.enabled")
.doc("Whether to run the Structured Streaming Web UI for the Spark application when the " +
"Spark Web UI is enabled.")
.booleanConf
.createWithDefault(true)

val STREAMING_UI_RETAINED_PROGRESS_UPDATES =
buildStaticConf("spark.sql.streaming.ui.retainedProgressUpdates")
.doc("The number of progress updates to retain for a streaming query for Structured " +
"Streaming UI.")
.intConf
.createWithDefault(100)

val STREAMING_UI_RETAINED_QUERIES =
buildStaticConf("spark.sql.streaming.ui.retainedQueries")
.doc("The number of inactive queries to retain for Structured Streaming UI.")
.intConf
.createWithDefault(100)
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,14 @@ private[sql] class SharedState(
* data to show.
*/
lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = {
val sqlConf = SQLConf.get
if (sqlConf.isStreamingUIEnabled) {
val statusListener = new StreamingQueryStatusListener(sqlConf)
sparkContext.ui.foreach(new StreamingQueryTab(statusListener, _))
Some(statusListener)
} else {
None
sparkContext.ui.flatMap { ui =>
if (conf.get(STREAMING_UI_ENABLED)) {
val statusListener = new StreamingQueryStatusListener(conf)
new StreamingQueryTab(statusListener, ui)
Some(statusListener)
} else {
None
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}

/**
* A customized StreamingQueryListener used in structured streaming UI, which contains all
* UI data for both active and inactive query.
* TODO: Add support for history server.
*/
private[sql] class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListener {
private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends StreamingQueryListener {

private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
Expand All @@ -45,8 +46,9 @@ private[sql] class StreamingQueryStatusListener(sqlConf: SQLConf) extends Stream
private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, StreamingQueryUIData]()
private[ui] val inactiveQueryStatus = new mutable.Queue[StreamingQueryUIData]()

private val streamingProgressRetention = sqlConf.streamingProgressRetention
private val inactiveQueryStatusRetention = sqlConf.streamingUIInactiveQueryRetention
private val streamingProgressRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
private val inactiveQueryStatusRetention = conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)

override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
activeQueryStatus.putIfAbsent(event.runId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ private[sql] class StreamingQueryTab(
parent.addStaticHandler(StreamingQueryTab.STATIC_RESOURCE_DIR, "/static/sql")
}

object StreamingQueryTab {
private[sql] object StreamingQueryTab {
private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.streaming
class StreamingQueryStatusListenerSuite extends StreamTest {

test("onQueryStarted, onQueryProgress, onQueryTerminated") {
val listener = new StreamingQueryStatusListener(spark.sqlContext.conf)
val listener = new StreamingQueryStatusListener(spark.sparkContext.conf)

// hanlde query started event
val id = UUID.randomUUID()
Expand Down Expand Up @@ -73,7 +73,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
}

test("same query start multiple times") {
val listener = new StreamingQueryStatusListener(spark.sqlContext.conf)
val listener = new StreamingQueryStatusListener(spark.sparkContext.conf)

// handle first time start
val id = UUID.randomUUID()
Expand Down

0 comments on commit f9b8637

Please sign in to comment.