From c6ff8a912b36465dbd2c34ee8d400862bd3d423d Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 4 Jun 2015 15:20:47 +0800 Subject: [PATCH] Display Streaming blocks in Streaming UI --- .../spark/streaming/StreamingContext.scala | 10 +-- .../spark/streaming/dstream/DStream.scala | 3 +- .../receiver/ReceivedBlockHandler.scala | 19 ++--- .../streaming/scheduler/ReceiverTracker.scala | 7 +- .../scheduler/StreamingListener.scala | 17 ++++- .../scheduler/StreamingListenerBus.scala | 4 ++ .../streaming/ui/StreamBlockUIData.scala | 27 +++++++ .../ui/StreamingJobProgressListener.scala | 31 +++++++- .../streaming/ui/StreamingStoragePage.scala | 68 ++++++++++++++++++ .../streaming/ui/StreamingStorageTab.scala | 31 ++++++++ .../spark/streaming/ui/StreamingTab.scala | 38 +--------- .../spark/streaming/ui/StreamingUI.scala | 70 +++++++++++++++++++ .../streaming/ReceivedBlockTrackerSuite.scala | 6 +- .../spark/streaming/UISeleniumSuite.scala | 9 +++ .../StreamingJobProgressListenerSuite.scala | 41 ++++++++++- 15 files changed, 326 insertions(+), 55 deletions(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/StreamBlockUIData.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingStoragePage.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingStorageTab.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 9cd9684d36404..b024ff730130d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -42,7 +42,7 @@ import org.apache.spark.streaming.StreamingContextState._ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver} import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} -import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} +import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingUI} import org.apache.spark.util.{CallSite, Utils} /** @@ -185,9 +185,9 @@ class StreamingContext private[streaming] ( private[streaming] val progressListener = new StreamingJobProgressListener(this) - private[streaming] val uiTab: Option[StreamingTab] = + private[streaming] val streamingUI: Option[StreamingUI] = if (conf.getBoolean("spark.ui.enabled", true)) { - Some(new StreamingTab(this)) + Some(new StreamingUI(this)) } else { None } @@ -598,7 +598,7 @@ class StreamingContext private[streaming] ( } shutdownHookRef = Utils.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) - uiTab.foreach(_.attach()) + streamingUI.foreach(_.start()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") @@ -674,7 +674,7 @@ class StreamingContext private[streaming] ( logWarning("StreamingContext has already been stopped") case ACTIVE => scheduler.stop(stopGracefully) - uiTab.foreach(_.detach()) + streamingUI.foreach(_.stop()) StreamingContext.setActiveContext(null) waiter.notifyStop() if (shutdownHookRef != null) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 192aa6a139bcb..8c761afb6a4f9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -30,7 +30,7 @@ import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD, RDDOperationScope} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext.rddToFileName -import org.apache.spark.streaming.scheduler.Job +import org.apache.spark.streaming.scheduler.{Job, StreamingListenerBlockRemoved} import org.apache.spark.streaming.ui.UIUtils import org.apache.spark.util.{CallSite, MetadataCleaner, Utils} @@ -445,6 +445,7 @@ abstract class DStream[T: ClassTag] ( case b: BlockRDD[_] => logInfo("Removing blocks of RDD " + b + " of time " + time) b.removeBlocks() + ssc.scheduler.listenerBus.post(StreamingListenerBlockRemoved(b.blockIds)) case _ => } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 207d64d9414ee..6ac488ce44330 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -33,6 +33,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException} /** Trait that represents the metadata related to storage of blocks */ private[streaming] trait ReceivedBlockStoreResult { def blockId: StreamBlockId // Any implementation of this trait will store a block id + def blockStatus: BlockStatus } /** Trait that represents a class that handles the storage of blocks received by receiver */ @@ -51,8 +52,8 @@ private[streaming] trait ReceivedBlockHandler { * that stores the metadata related to storage of blocks using * [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]] */ -private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId) - extends ReceivedBlockStoreResult +private[streaming] case class BlockManagerBasedStoreResult( + blockId: StreamBlockId, blockStatus: BlockStatus) extends ReceivedBlockStoreResult /** @@ -75,11 +76,11 @@ private[streaming] class BlockManagerBasedBlockHandler( throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") } - if (!putResult.map { _._1 }.contains(blockId)) { + val blockStatus = putResult.filter(_._1 == blockId).map(_._2).headOption.getOrElse { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } - BlockManagerBasedStoreResult(blockId) + BlockManagerBasedStoreResult(blockId, blockStatus) } def cleanupOldBlocks(threshTime: Long) { @@ -96,6 +97,7 @@ private[streaming] class BlockManagerBasedBlockHandler( */ private[streaming] case class WriteAheadLogBasedStoreResult( blockId: StreamBlockId, + blockStatus: BlockStatus, walRecordHandle: WriteAheadLogRecordHandle ) extends ReceivedBlockStoreResult @@ -167,10 +169,11 @@ private[streaming] class WriteAheadLogBasedBlockHandler( val storeInBlockManagerFuture = Future { val putResult = blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true) - if (!putResult.map { _._1 }.contains(blockId)) { + val blockStatus = putResult.filter(_._1 == blockId).map(_._2).headOption.getOrElse { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } + blockStatus } // Store the block in write ahead log @@ -179,9 +182,9 @@ private[streaming] class WriteAheadLogBasedBlockHandler( } // Combine the futures, wait for both to complete, and return the write ahead log record handle - val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2) - val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout) - WriteAheadLogBasedStoreResult(blockId, walRecordHandle) + val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture) + val (blockStatus, walRecordHandle) = Await.result(combinedFuture, blockStoreTimeout) + WriteAheadLogBasedStoreResult(blockId, blockStatus, walRecordHandle) } def cleanupOldBlocks(threshTime: Long) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index f1504b09c9873..7fa8ce93b8e5b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -178,7 +178,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Add new blocks for the given stream */ private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { - receivedBlockTracker.addBlock(receivedBlockInfo) + if (receivedBlockTracker.addBlock(receivedBlockInfo)) { + listenerBus.post(StreamingListenerBlockAdded(receivedBlockInfo)) + true + } else { + false + } } /** Report error sent by a receiver */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 74dbba453f026..2d368a3e976fe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -19,8 +19,9 @@ package org.apache.spark.streaming.scheduler import scala.collection.mutable.Queue -import org.apache.spark.util.Distribution import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.storage.BlockId +import org.apache.spark.util.Distribution /** * :: DeveloperApi :: @@ -50,6 +51,14 @@ case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo) case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo) extends StreamingListenerEvent +@DeveloperApi +case class StreamingListenerBlockAdded(blockInfo: ReceivedBlockInfo) + extends StreamingListenerEvent + +@DeveloperApi +case class StreamingListenerBlockRemoved(blockIds: Seq[BlockId]) + extends StreamingListenerEvent + /** * :: DeveloperApi :: * A listener interface for receiving information about an ongoing streaming @@ -75,6 +84,12 @@ trait StreamingListener { /** Called when processing of a batch of jobs has completed. */ def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } + + /** Called when adding a block to the block manager. */ + def onBlockAdded(blockAdded: StreamingListenerBlockAdded) { } + + /** Called when removing blocks from the block manager. */ + def onBlockRemoved(blockRemoved: StreamingListenerBlockRemoved) { } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index b07d6cf347ca7..cf5e42deac161 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -43,6 +43,10 @@ private[spark] class StreamingListenerBus listener.onBatchStarted(batchStarted) case batchCompleted: StreamingListenerBatchCompleted => listener.onBatchCompleted(batchCompleted) + case blockAdded: StreamingListenerBlockAdded => + listener.onBlockAdded(blockAdded) + case blockRemoved: StreamingListenerBlockRemoved => + listener.onBlockRemoved(blockRemoved) case _ => } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamBlockUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamBlockUIData.scala new file mode 100644 index 0000000000000..273b5bdd4b176 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamBlockUIData.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.ui + +import org.apache.spark.storage.{StreamBlockId, StorageLevel} + +case class StreamBlockUIData( + blockId: StreamBlockId, + storageLevel: StorageLevel, + memSize: Long, + diskSize: Long, + externalBlockStoreSize: Long) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 68e8ce98945e0..48b90744c0d92 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -29,7 +29,7 @@ import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted - +import org.apache.spark.storage.StreamBlockId private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener with SparkListener { @@ -42,6 +42,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) private var totalReceivedRecords = 0L private var totalProcessedRecords = 0L private val receiverInfos = new HashMap[Int, ReceiverInfo] + // Store the block infos for all blocks used by Streaming + private val blockInfos = new HashMap[StreamBlockId, StreamBlockUIData] // Because onJobStart and onBatchXXX messages are processed in different threads, // we may not be able to get the corresponding BatchUIData when receiving onJobStart. So here we @@ -245,6 +247,33 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) } batchUIData } + + override def onBlockAdded(blockAdded: StreamingListenerBlockAdded): Unit = { + val blockId = blockAdded.blockInfo.blockId + val blockStatus = blockAdded.blockInfo.blockStoreResult.blockStatus + val blockUIData = StreamBlockUIData( + blockId, + blockStatus.storageLevel, + blockStatus.memSize, + blockStatus.diskSize, + blockStatus.externalBlockStoreSize) + synchronized { + blockInfos += (blockId -> blockUIData) + } + } + + override def onBlockRemoved(blockRemoved: StreamingListenerBlockRemoved): Unit = { + // The block ids must be StreamBlockId. The `isInstanceOf` check is just for safety. + val blockedToRemoved = + blockRemoved.blockIds.filter(_.isInstanceOf[StreamBlockId]).map(_.asInstanceOf[StreamBlockId]) + synchronized { + blockInfos --= blockedToRemoved + } + } + + def getAllBlocks: Seq[StreamBlockUIData] = synchronized { + blockInfos.values.toSeq + } } private[streaming] object StreamingJobProgressListener { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingStoragePage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingStoragePage.scala new file mode 100644 index 0000000000000..f99240fef0c5d --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingStoragePage.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.ui + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.Logging +import org.apache.spark.ui.WebUIPage +import org.apache.spark.ui.{UIUtils => SparkUIUtils} +import org.apache.spark.util.Utils + +private[ui] class StreamingStoragePage(parent: StreamingStorageTab) + extends WebUIPage("") with Logging { + + private val listener = parent.listener + + override def render(request: HttpServletRequest): Seq[Node] = { + // Sort by (timestamp, streamId) in descending order + val blockInfos = + listener.getAllBlocks.sortBy(b => (b.blockId.uniqueId, b.blockId.streamId)).reverse + val content = SparkUIUtils.listingTable(blockHeader, blockRow, blockInfos) + SparkUIUtils.headerSparkPage("Receiver Blocks", content, parent) + } + + private def blockHeader = Seq( + "Block Id", + "Storage Level", + "Size in Memory", + "Size in ExternalBlockStore", + "Size on Disk") + + private def blockRow(blockInfo: StreamBlockUIData): Seq[Node] = { + + + {blockInfo.blockId.toString} + + + {blockInfo.storageLevel.description} + + + {Utils.bytesToString(blockInfo.memSize)} + + + {Utils.bytesToString(blockInfo.externalBlockStoreSize)} + + + {Utils.bytesToString(blockInfo.diskSize)} + + + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingStorageTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingStorageTab.scala new file mode 100644 index 0000000000000..bd21a2388ebc7 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingStorageTab.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.ui + +import org.apache.spark.Logging +import org.apache.spark.ui.{SparkUI, SparkUITab} + +/** + * Spark Web UI tab that shows blocks of a streaming job. + */ +private[ui] class StreamingStorageTab(parent: SparkUI, val listener: StreamingJobProgressListener) + extends SparkUITab(parent, "streaming storage") with Logging { + + attachPage(new StreamingStoragePage(this)) +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala index e0c0f57212f55..0794949f46dcd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala @@ -17,50 +17,18 @@ package org.apache.spark.streaming.ui -import org.eclipse.jetty.servlet.ServletContextHandler - -import org.apache.spark.{Logging, SparkException} import org.apache.spark.streaming.StreamingContext -import org.apache.spark.ui.{JettyUtils, SparkUI, SparkUITab} - -import StreamingTab._ +import org.apache.spark.ui.{SparkUI, SparkUITab} /** * Spark Web UI tab that shows statistics of a streaming job. * This assumes the given SparkContext has enabled its SparkUI. */ -private[spark] class StreamingTab(val ssc: StreamingContext) - extends SparkUITab(getSparkUI(ssc), "streaming") with Logging { +private[spark] class StreamingTab(parent: SparkUI, val ssc: StreamingContext) + extends SparkUITab(parent, "streaming") { - private val STATIC_RESOURCE_DIR = "org/apache/spark/streaming/ui/static" - - val parent = getSparkUI(ssc) val listener = ssc.progressListener - ssc.addStreamingListener(listener) - ssc.sc.addSparkListener(listener) attachPage(new StreamingPage(this)) attachPage(new BatchPage(this)) - - var staticHandler: ServletContextHandler = null - - def attach() { - getSparkUI(ssc).attachTab(this) - staticHandler = JettyUtils.createStaticHandler(STATIC_RESOURCE_DIR, "/static/streaming") - getSparkUI(ssc).attachHandler(staticHandler) - } - - def detach() { - getSparkUI(ssc).detachTab(this) - getSparkUI(ssc).detachHandler(staticHandler) - staticHandler = null - } -} - -private object StreamingTab { - def getSparkUI(ssc: StreamingContext): SparkUI = { - ssc.sc.ui.getOrElse { - throw new SparkException("Parent SparkUI to attach this tab to not found!") - } - } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala new file mode 100644 index 0000000000000..af40941ce9678 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.ui + +import org.apache.spark.SparkException +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.ui.{SparkUI, JettyUtils} +import org.eclipse.jetty.servlet.ServletContextHandler + +class StreamingUI(val ssc: StreamingContext) { + + private val STATIC_RESOURCE_DIR = "org/apache/spark/streaming/ui/static" + + val listener = ssc.progressListener + ssc.addStreamingListener(listener) + ssc.sc.addSparkListener(listener) + + var staticHandler: ServletContextHandler = null + var streamingTab: StreamingTab = null + var streamingStorageTab: StreamingStorageTab = null + + def start() { + val sparkUI = StreamingUI.getSparkUI(ssc) + streamingTab = new StreamingTab(sparkUI, ssc) + sparkUI.attachTab(streamingTab) + streamingStorageTab = new StreamingStorageTab(sparkUI, listener) + sparkUI.attachTab(streamingStorageTab) + staticHandler = JettyUtils.createStaticHandler(STATIC_RESOURCE_DIR, "/static/streaming") + StreamingUI.getSparkUI(ssc).attachHandler(staticHandler) + } + + def stop() { + val sparkUI = StreamingUI.getSparkUI(ssc) + if (streamingTab != null) { + sparkUI.detachTab(streamingTab) + streamingTab = null + } + if (streamingStorageTab != null) { + sparkUI.detachTab(streamingStorageTab) + streamingStorageTab = null + } + if (staticHandler != null) { + StreamingUI.getSparkUI(ssc).detachHandler(staticHandler) + staticHandler = null + } + } +} + +object StreamingUI { + def getSparkUI(ssc: StreamingContext): SparkUI = { + ssc.sc.ui.getOrElse { + throw new SparkException("Parent SparkUI to attach this tab to not found!") + } + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 6f0ee774cb5cf..6a2829bf4f8cb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -29,7 +29,7 @@ import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.concurrent.Eventually._ import org.apache.spark.{Logging, SparkConf, SparkException, SparkFunSuite} -import org.apache.spark.storage.StreamBlockId +import org.apache.spark.storage.{BlockStatus, StreamBlockId, StorageLevel} import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.util.{WriteAheadLogUtils, FileBasedWriteAheadLogReader} @@ -225,7 +225,9 @@ class ReceivedBlockTrackerSuite /** Generate blocks infos using random ids */ def generateBlockInfos(): Seq[ReceivedBlockInfo] = { List.fill(5)(ReceivedBlockInfo(streamId, 0, None, - BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt))))) + BlockManagerBasedStoreResult( + StreamBlockId(streamId, math.abs(Random.nextInt)), + BlockStatus(StorageLevel.MEMORY_AND_DISK, 100, 100, 0)))) } /** Get all the data written in the given write ahead log file. */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index cbc24aee4fa1e..2f4aff1cc55d8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -86,9 +86,14 @@ class UISeleniumSuite eventually(timeout(10 seconds), interval(50 milliseconds)) { go to (sparkUI.appUIAddress.stripSuffix("/")) find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None) + find(cssSelector( """ul li a[href*="streaming storage"]""")) should not be (None) } eventually(timeout(10 seconds), interval(50 milliseconds)) { + // check whether streaming storage page exists + go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming%20storage") + findAll(cssSelector("h3")).map(_.text).toSeq should contain("Receiver Blocks") + // check whether streaming page exists go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming") val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq @@ -190,6 +195,10 @@ class UISeleniumSuite go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming") val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq h3Text should not contain("Streaming Statistics") + + // check whether streaming storage page exists + go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming%20storage") + findAll(cssSelector("h3")).map(_.text).toSeq should not contain("Receiver Blocks") } } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala index c9175d61b1f49..6745a16f1ddb8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.streaming.ui import java.util.Properties +import org.apache.spark.storage.{StorageLevel, BlockStatus, StreamBlockId} +import org.apache.spark.streaming.receiver.{BlockManagerBasedStoreResult, ReceivedBlockStoreResult} import org.scalatest.Matchers import org.apache.spark.scheduler.SparkListenerJobStart @@ -134,6 +136,30 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.receiverInfo(1) should be (Some(receiverInfoError)) listener.receiverInfo(2) should be (Some(receiverInfoStopped)) listener.receiverInfo(3) should be (None) + + // onBlockAdded + val block = BlockManagerBasedStoreResult( + StreamBlockId(streamId = 0, uniqueId = 0), + BlockStatus(StorageLevel.MEMORY_AND_DISK, 100, 100, 0) + ) + val blockInfo = ReceivedBlockInfo(streamId = 0, 100, None, block) + listener.onBlockAdded(StreamingListenerBlockAdded(blockInfo)) + val blocks = listener.getAllBlocks + blocks should have size (1) + val expectedUIData = + StreamBlockUIData( + StreamBlockId(streamId = 0, uniqueId = 0), + StorageLevel.MEMORY_AND_DISK, + 100, + 100, + 0 + ) + blocks(0) should be (expectedUIData) + + // onBlockRemoved + listener.onBlockRemoved( + StreamingListenerBlockRemoved(Seq(StreamBlockId(streamId = 0, uniqueId = 0)))) + listener.getAllBlocks should be (Seq.empty) } test("Remove the old completed batches when exceeding the limit") { @@ -210,7 +236,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000) - for (_ <- 0 until 2 * limit) { + for (i <- 0 until 2 * limit) { val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L) // onBatchSubmitted @@ -237,6 +263,18 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // onBatchCompleted val batchInfoCompleted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) + + // onBlockAdded + val block = BlockManagerBasedStoreResult( + StreamBlockId(streamId = i, uniqueId = i), + BlockStatus(StorageLevel.MEMORY_AND_DISK, 100, 100, 0) + ) + val blockInfo = ReceivedBlockInfo(streamId = i, 100, None, block) + listener.onBlockAdded(StreamingListenerBlockAdded(blockInfo)) + + // onBlockRemoved + listener.onBlockRemoved( + StreamingListenerBlockRemoved(Seq(StreamBlockId(streamId = i, uniqueId = i)))) } listener.waitingBatches.size should be (0) @@ -245,6 +283,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.batchTimeToOutputOpIdSparkJobIdPair.size() should be <= (listener.waitingBatches.size + listener.runningBatches.size + listener.retainedCompletedBatches.size + 10) + listener.getAllBlocks should be (Seq.empty) } }