Skip to content

Commit

Permalink
Clean up the codes
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Apr 30, 2015
1 parent a459f49 commit d0b0aec
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 320 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
}

.tooltip-inner {
max-width: 500px !important;
max-width: 500px !important; // Make sure we only have one line tooltip
}

.line {
Expand All @@ -46,19 +46,13 @@
}

.bar rect:hover {
//fill: rgb(49, 91, 125);
//fill: #005580;
fill: rgb(0, 194, 255);
}

.stable-text text:hover {
fill: #0088cc;
fill: #00c2ff;
}

.timeline {
width: 500px;
}

.distribution {
.histogram {
width: auto;
}
309 changes: 139 additions & 170 deletions core/src/main/resources/org/apache/spark/ui/static/streaming-page.js

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ private[spark] object UIUtils extends Logging {
}

/**
* Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value after converting with
* the `TimeUnit`.
* Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value
* after converting, also with its TimeUnit.
*/
def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = {
if (milliseconds < 1000) {
Expand All @@ -65,7 +65,11 @@ private[spark] object UIUtils extends Logging {
return (minutes, TimeUnit.MINUTES)
}
val hours = minutes / 60
(hours, TimeUnit.HOURS)
if (hours < 24) {
return (hours, TimeUnit.HOURS)
}
val days = hours / 24
(days, TimeUnit.DAYS)
}

def formatDate(date: Date): String = dateFormat.get.format(date)
Expand Down
52 changes: 52 additions & 0 deletions core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.apache.spark.ui

import java.util.concurrent.TimeUnit

import org.scalatest.FunSuite
import org.scalatest.Matchers

class UIUtilsSuite extends FunSuite with Matchers{

test("shortTimeUnitString") {
assert("ns" === UIUtils.shortTimeUnitString(TimeUnit.NANOSECONDS))
assert("us" === UIUtils.shortTimeUnitString(TimeUnit.MICROSECONDS))
assert("ms" === UIUtils.shortTimeUnitString(TimeUnit.MILLISECONDS))
assert("sec" === UIUtils.shortTimeUnitString(TimeUnit.SECONDS))
assert("min" === UIUtils.shortTimeUnitString(TimeUnit.MINUTES))
assert("hrs" === UIUtils.shortTimeUnitString(TimeUnit.HOURS))
assert("days" === UIUtils.shortTimeUnitString(TimeUnit.DAYS))
}

test("normalizeDuration") {
verifyNormalizedTime(900, TimeUnit.MILLISECONDS, 900)
verifyNormalizedTime(1.0, TimeUnit.SECONDS, 1000)
verifyNormalizedTime(1.0, TimeUnit.MINUTES, 60 * 1000)
verifyNormalizedTime(1.0, TimeUnit.HOURS, 60 * 60 * 1000)
verifyNormalizedTime(1.0, TimeUnit.DAYS, 24 * 60 * 60 * 1000)
}

private def verifyNormalizedTime(
expectedTime: Double, expectedUnit: TimeUnit, input: Long): Unit = {
val (time, unit) = UIUtils.normalizeDuration(input)
time should be (expectedTime +- 1E-6)
unit should be (expectedUnit)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ case class ReceiverInfo(
active: Boolean,
location: String,
lastErrorMessage: String = "",
lastError: String = ""
lastError: String = "",
lastErrorTime: Long = -1L
) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
private def deregisterReceiver(streamId: Int, message: String, error: String) {
val newReceiverInfo = receiverInfo.get(streamId) match {
case Some(oldInfo) =>
oldInfo.copy(actor = null, active = false, lastErrorMessage = message, lastError = error)
oldInfo.copy(actor = null, active = false, lastErrorMessage = message,
lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
case None =>
logWarning("No prior receiver info")
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message,
lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
}
receiverInfo -= streamId
listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
Expand All @@ -182,7 +184,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
oldInfo.copy(lastErrorMessage = message, lastError = error)
case None =>
logWarning("No prior receiver info")
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message,
lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
}
receiverInfo(streamId) = newReceiverInfo
listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private[ui] abstract class BatchTableBase(tableId: String) {
protected def columns: Seq[Node] = {
<th>Batch Time</th>
<th>Input Size</th>
<th>Scheduling Delay</th>
<th>Streaming Scheduling Delay</th>
<th>Processing Time</th>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
import org.apache.spark.util.Distribution


private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
Expand All @@ -39,7 +38,6 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private var totalReceivedRecords = 0L
private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
private val receiverLastErrorTime = new HashMap[Int, Long]

val batchDuration = ssc.graph.batchDuration.milliseconds

Expand All @@ -52,7 +50,6 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
override def onReceiverError(receiverError: StreamingListenerReceiverError) {
synchronized {
receiverInfos(receiverError.receiverInfo.streamId) = receiverError.receiverInfo
receiverLastErrorTime(receiverError.receiverInfo.streamId) = System.currentTimeMillis()
}
}

Expand Down Expand Up @@ -123,32 +120,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
completedBatchInfos.toSeq
}

def processingDelayDistribution: Option[Distribution] = synchronized {
extractDistribution(_.processingDelay)
}

def schedulingDelayDistribution: Option[Distribution] = synchronized {
extractDistribution(_.schedulingDelay)
}

def totalDelayDistribution: Option[Distribution] = synchronized {
extractDistribution(_.totalDelay)
}

def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit)
val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
(0 until numReceivers).map { receiverId =>
val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
batchInfo.get(receiverId).getOrElse(Array.empty)
}
val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
// calculate records per second for each batch
blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
}
val distributionOption = Distribution(recordsOfParticularReceiver)
(receiverId, distributionOption)
}.toMap
def allReceivers: Seq[Int] = synchronized {
receiverInfos.keys.toSeq
}

def receivedRecordsWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
Expand Down Expand Up @@ -184,10 +157,6 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
receiverInfos.get(receiverId)
}

def receiverLastErrorTime(receiverId: Int): Option[Long] = synchronized {
receiverLastErrorTime.get(receiverId)
}

def lastCompletedBatch: Option[BatchInfo] = synchronized {
completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
}
Expand All @@ -200,8 +169,4 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
(waitingBatchInfos.values.toSeq ++
runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
}

private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
}
}
Loading

0 comments on commit d0b0aec

Please sign in to comment.