Skip to content

Commit

Permalink
[SPARK-6860][Streaming][WebUI] Fix the possible inconsistency of Stre…
Browse files Browse the repository at this point in the history
…amingPage

Because `StreamingPage.render` doesn't hold the `listener` lock when generating the content, the different parts of content may have some inconsistent values if `listener` updates its status at the same time. And it will confuse people.

This PR added `listener.synchronized` to make sure we have a consistent view of StreamingJobProgressListener when creating the content.

Author: zsxwing <zsxwing@gmail.com>

Closes #5470 from zsxwing/SPARK-6860 and squashes the following commits:

cec6f92 [zsxwing] Add missing 'synchronized' in StreamingJobProgressListener
7182498 [zsxwing] Add synchronized to make sure we have a consistent view of StreamingJobProgressListener when creating the content
  • Loading branch information
zsxwing authored and srowen committed Apr 15, 2015
1 parent 0e5ca9e commit 2954a1e
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}.toMap
}

def lastReceivedBatchRecords: Map[Int, Long] = {
def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
(0 until numReceivers).map { receiverId =>
Expand All @@ -155,19 +155,19 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}

def receiverInfo(receiverId: Int): Option[ReceiverInfo] = {
def receiverInfo(receiverId: Int): Option[ReceiverInfo] = synchronized {
receiverInfos.get(receiverId)
}

def lastCompletedBatch: Option[BatchInfo] = {
def lastCompletedBatch: Option[BatchInfo] = synchronized {
completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
}

def lastReceivedBatch: Option[BatchInfo] = {
def lastReceivedBatch: Option[BatchInfo] = synchronized {
retainedBatches.lastOption
}

private def retainedBatches: Seq[BatchInfo] = synchronized {
private def retainedBatches: Seq[BatchInfo] = {
(waitingBatchInfos.values.toSeq ++
runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ private[ui] class StreamingPage(parent: StreamingTab)

/** Render the page */
def render(request: HttpServletRequest): Seq[Node] = {
val content =
val content = listener.synchronized {
generateBasicStats() ++ <br></br> ++
<h4>Statistics over last {listener.retainedCompletedBatches.size} processed batches</h4> ++
generateReceiverStats() ++
generateBatchStatsTable()
}
UIUtils.headerSparkPage("Streaming", content, parent, Some(5000))
}

Expand Down

0 comments on commit 2954a1e

Please sign in to comment.