Skip to content

Commit

Permalink
Merge pull request #22 from xuanyuanking/SPARK-29543
Browse files Browse the repository at this point in the history
address comments
  • Loading branch information
uncleGen committed Jan 26, 2020
2 parents dd5ca20 + b3697a2 commit cb1e68c
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val STREAMING_UI_ENABLED =
buildConf("spark.sql.streaming.ui.enabled")
.doc("Whether to run the structured streaming UI for the Spark application.")
.booleanConf
.createWithDefault(true)

val STREAMING_UI_INACTIVE_QUERY_RETENTION =
buildConf("spark.sql.streaming.ui.numInactiveQueries")
.doc("The number of inactive queries to retain for structured streaming ui.")
Expand Down Expand Up @@ -2255,6 +2261,8 @@ class SQLConf extends Serializable with Logging {

def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)

def isStreamingUIEnabled: Boolean = getConf(STREAMING_UI_ENABLED)

def streamingUIInactiveQueryRetention: Int = getConf(STREAMING_UI_INACTIVE_QUERY_RETENTION)

def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@ abstract class StreamExecution(
}

// `postEvent` does not throw non fatal exception.
val submitTime = triggerClock.getTimeMillis()
postEvent(new QueryStartedEvent(id, runId, name, submitTime))
val submissionTime = triggerClock.getTimeMillis()
postEvent(new QueryStartedEvent(id, runId, name, submissionTime))

// Unblock starting thread
startLatch.countDown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.FsUrlStreamHandlerFactory

import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.CacheManager
Expand Down Expand Up @@ -145,14 +144,16 @@ private[sql] class SharedState(
* A [[StreamingQueryListener]] for structured streaming ui, it contains all streaming query ui
* data to show.
*/
lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] =
if (conf.get(UI_ENABLED)) {
val statusListener = new StreamingQueryStatusListener(SQLConf.get)
lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = {
val sqlConf = SQLConf.get
if (sqlConf.isStreamingUIEnabled) {
val statusListener = new StreamingQueryStatusListener(sqlConf)
sparkContext.ui.foreach(new StreamingQueryTab(statusListener, _))
Some(statusListener)
} else {
None
}
}

/**
* A catalog that interacts with external systems.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ object StreamingQueryListener {
* @param id A unique query id that persists across restarts. See `StreamingQuery.id()`.
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
* @param name User-specified name of the query, null if not specified.
* @param submitTime The timestamp to start a query.
* @param submissionTime The timestamp to start a query.
* @since 2.1.0
*/
@Evolving
class QueryStartedEvent private[sql](
val id: UUID,
val runId: UUID,
val name: String,
val submitTime: Long) extends Event
val submissionTime: Long) extends Event

/**
* Event representing any progress updates in a query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
import org.apache.spark.sql.streaming.ui.UIUtils._
import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}

class StreamingQueryPage(parent: StreamingQueryTab)
private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
extends WebUIPage("") with Logging {
val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
df.setTimeZone(getTimeZone("UTC"))
Expand Down Expand Up @@ -61,11 +61,11 @@ class StreamingQueryPage(parent: StreamingQueryTab)
val name = UIUtils.getQueryName(query)
val status = UIUtils.getQueryStatus(query)
val duration = if (queryActive) {
SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submitTime)
SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submissionTime)
} else {
withNoProgress(query, {
val endTimeMs = query.lastProgress.timestamp
SparkUIUtils.formatDurationVerbose(df.parse(endTimeMs).getTime - query.submitTime)
SparkUIUtils.formatDurationVerbose(df.parse(endTimeMs).getTime - query.submissionTime)
}, "-")
}

Expand All @@ -74,7 +74,7 @@ class StreamingQueryPage(parent: StreamingQueryTab)
<td> {status} </td>
<td> {query.id} </td>
<td> <a href={statisticsLink}> {query.runId} </a> </td>
<td> {SparkUIUtils.formatDate(query.submitTime)} </td>
<td> {SparkUIUtils.formatDate(query.submissionTime)} </td>
<td> {duration} </td>
<td> {withNoProgress(query, {
(query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum /
Expand All @@ -94,8 +94,8 @@ class StreamingQueryPage(parent: StreamingQueryTab)
.partition(_.isActive)
val activeQueryTables = if (activeQueries.nonEmpty) {
val headerRow = Seq(
"Query Name", "Status", "Id", "Run ID", "Submit Time", "Duration", "Avg Input /sec",
"Avg Process /sec", "Last Batch ID")
"Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg Input /sec",
"Avg Process /sec", "Lastest Batch")

Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = true),
activeQueries, true, None, Seq(null), false))
Expand All @@ -105,8 +105,8 @@ class StreamingQueryPage(parent: StreamingQueryTab)

val inactiveQueryTables = if (inactiveQueries.nonEmpty) {
val headerRow = Seq(
"Query Name", "Status", "Id", "Run ID", "Submit Time", "Duration", "Avg Input /sec",
"Avg Process /sec", "Last Batch ID", "Error")
"Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg Input /sec",
"Avg Process /sec", "Lastest Batch", "Error")

Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = false),
inactiveQueries, true, None, Seq(null), false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
import org.apache.spark.sql.streaming.ui.UIUtils._
import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage}

class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
extends WebUIPage("statistics") with Logging {
val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
df.setTimeZone(getTimeZone("UTC"))
Expand Down Expand Up @@ -82,7 +82,7 @@ class StreamingQueryStatisticsPage(parent: StreamingQueryTab)

def generateBasicInfo(query: StreamingQueryUIData): Seq[Node] = {
val duration = if (query.isActive) {
SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submitTime)
SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submissionTime)
} else {
withNoProgress(query, {
val end = query.lastProgress.timestamp
Expand All @@ -100,7 +100,7 @@ class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
</strong>
since
<strong>
{SparkUIUtils.formatDate(query.submitTime)}
{SparkUIUtils.formatDate(query.submissionTime)}
</strong>
(<strong>{numBatches}</strong> completed batches)
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryPro
* UI data for both active and inactive query.
* TODO: Add support for history server.
*/
class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListener {
private[sql] class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListener {

private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
Expand All @@ -50,7 +50,7 @@ class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListe

override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
activeQueryStatus.putIfAbsent(event.runId,
new StreamingQueryUIData(event.name, event.id, event.runId, event.submitTime))
new StreamingQueryUIData(event.name, event.id, event.runId, event.submissionTime))
}

override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
Expand All @@ -62,20 +62,19 @@ class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListe
queryStatus.updateProcess(event.progress, streamingProgressRetention)
}

override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
override def onQueryTerminated(
event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized {
val queryStatus = activeQueryStatus.remove(event.runId)
if (queryStatus != null) {
queryStatus.queryTerminated(event)
inactiveQueryStatus.synchronized {
inactiveQueryStatus += queryStatus
while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
inactiveQueryStatus.dequeue()
}
inactiveQueryStatus += queryStatus
while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
inactiveQueryStatus.dequeue()
}
}
}

def allQueryStatus: Seq[StreamingQueryUIData] = inactiveQueryStatus.synchronized {
def allQueryStatus: Seq[StreamingQueryUIData] = synchronized {
activeQueryStatus.values().asScala.toSeq ++ inactiveQueryStatus
}
}
Expand All @@ -88,7 +87,7 @@ private[ui] class StreamingQueryUIData(
val name: String,
val id: UUID,
val runId: UUID,
val submitTime: Long) {
val submissionTime: Long) {

/** Holds the most recent query progress updates. */
private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package org.apache.spark.sql.streaming.ui
import org.apache.spark.internal.Logging
import org.apache.spark.ui.{SparkUI, SparkUITab}

class StreamingQueryTab(val statusListener: StreamingQueryStatusListener, sparkUI: SparkUI)
extends SparkUITab(sparkUI, "StreamingQuery") with Logging {
private[sql] class StreamingQueryTab(
val statusListener: StreamingQueryStatusListener,
sparkUI: SparkUI) extends SparkUITab(sparkUI, "StreamingQuery") with Logging {

override val name = "Structured Streaming"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
when(streamQuery.name).thenReturn("query")
when(streamQuery.id).thenReturn(id)
when(streamQuery.runId).thenReturn(id)
when(streamQuery.submitTime).thenReturn(1L)
when(streamQuery.submissionTime).thenReturn(1L)
when(streamQuery.lastProgress).thenReturn(progress)
when(streamQuery.recentProgress).thenReturn(Array(progress))
when(streamQuery.exception).thenReturn(None)
Expand Down

0 comments on commit cb1e68c

Please sign in to comment.