diff --git a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css
index 357a0b61a3b9e..5da9d631ad124 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css
@@ -31,7 +31,7 @@
}
.tooltip-inner {
- max-width: 500px !important;
+ max-width: 500px !important; // Make sure we only have one line tooltip
}
.line {
@@ -46,19 +46,13 @@
}
.bar rect:hover {
- //fill: rgb(49, 91, 125);
- //fill: #005580;
- fill: rgb(0, 194, 255);
-}
-
-.stable-text text:hover {
- fill: #0088cc;
+ fill: #00c2ff;
}
.timeline {
width: 500px;
}
-.distribution {
+.histogram {
width: auto;
}
diff --git a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
index d473d0dd16f59..acf5504ac4edf 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
@@ -16,56 +16,82 @@
*/
-var timelineMarginLeft = 50;
-var distributionMinX = 0;
-var distributionMaxX = 0;
-var binCount = 10;
-
-// An invisible div to show details of a point in the graph
-var graphTooltip = d3.select("body").append("div")
- .attr("class", "label")
- .style("display", "inline-block")
- .style("position", "absolute")
- .style("z-index", "10")
- .style("visibility", "hidden")
- .text("");
-
-// Show "text" at location (x, y)
-function showGraphTooltip(text, x, y) {
- var left = x;
- var top = y;
- graphTooltip.style("visibility", "visible")
- .style("top", top + "px")
- .style("left", left + "px")
- .text(text);
-}
+// timeFormat: StreamingPage.scala will generate a global "timeFormat" dictionary to store the time
+// and its formatted string. Because we cannot specify a timezone in JavaScript, to make sure the
+// server and client use the same timezone, we use the "timeFormat" dictionary to format all time
+// values used in the graphs.
-// Hide "graphTooltip"
-function hideGraphTooltip() {
- graphTooltip.style("visibility", "hidden");
-}
+// A global margin left for all timeline graphs. It will be set in "registerTimeline". This will be
+// used to align all timeline graphs.
+var maxMarginLeftForTimeline = 0;
+
+// The max X values for all histograms. It will be set in "registerHistogram".
+var maxXForHistogram = 0;
+var histogramBinCount = 10;
+var yValueFormat = d3.format(",.2f");
+
+// Show a tooltip "text" for "node"
function showBootstrapTooltip(node, text) {
- console.log(text);
- $(node).tooltip({title: text, trigger: "manual", 'container': 'body'});
+ $(node).tooltip({title: text, trigger: "manual", container: "body"});
$(node).tooltip("show");
- console.log($(node));
}
+// Hide the tooltip for "node"
function hideBootstrapTooltip(node) {
$(node).tooltip("destroy");
}
+// Register a timeline graph. All timeline graphs should be register before calling any
+// "drawTimeline" so that we can determine the max margin left for all timeline graphs.
+function registerTimeline(minY, maxY) {
+ var numOfChars = yValueFormat(maxY).length;
+ // A least width for "maxY" in the graph
+ var pxForMaxY = numOfChars * 8 + 10;
+ // Make sure we have enough space to show the ticks in the y axis of timeline
+ maxMarginLeftForTimeline = pxForMaxY > maxMarginLeftForTimeline? pxForMaxY : maxMarginLeftForTimeline;
+}
+
+// Register a histogram graph. All histogram graphs should be register before calling any
+// "drawHistogram" so that we can determine the max X value for histograms.
+function registerHistogram(values, minY, maxY) {
+ var data = d3.layout.histogram().range([minY, maxY]).bins(histogramBinCount)(values);
+ // d.x is the y values while d.y is the x values
+ var maxX = d3.max(data, function(d) { return d.y; });
+ maxXForHistogram = maxX > maxXForHistogram ? maxX : maxXForHistogram;
+}
+
+// Draw a line between (x1, y1) and (x2, y2)
+function drawLine(svg, xFunc, yFunc, x1, y1, x2, y2) {
+ var line = d3.svg.line()
+ .x(function(d) { return xFunc(d.x); })
+ .y(function(d) { return yFunc(d.y); });
+ var data = [{x: x1, y: y1}, {x: x2, y: y2}];
+ svg.append("path")
+ .datum(data)
+ .style("stroke-dasharray", ("6, 6"))
+ .style("stroke", "lightblue")
+ .attr("class", "line")
+ .attr("d", line);
+}
+
/**
* @param id the `id` used in the html `div` tag
* @param data the data for the timeline graph
+ * @param minX the min value of X axis
+ * @param maxX the max value of X axis
* @param minY the min value of Y axis
* @param maxY the max value of Y axis
* @param unitY the unit of Y axis
+ * @param batchInterval if "batchInterval" is specified, we will draw a line for "batchInterval" in the graph
*/
-function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchTime) {
- d3.select(d3.select(id).node().parentNode).style("padding", "8px 0 8px 8px").style("border-right", "0px solid white");
- var margin = {top: 20, right: 27, bottom: 30, left: timelineMarginLeft};
+function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) {
+ // Hide the right border of "
". We cannot use "css" directly, or "sorttable.js" will override them.
+ d3.select(d3.select(id).node().parentNode)
+ .style("padding", "8px 0 8px 8px")
+ .style("border-right", "0px solid white");
+
+ var margin = {top: 20, right: 27, bottom: 30, left: maxMarginLeftForTimeline};
var width = 500 - margin.left - margin.right;
var height = 150 - margin.top - margin.bottom;
@@ -102,23 +128,8 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchTime) {
.text(unitY);
- if (batchTime) {
- var batchTimeline = d3.svg.line()
- .x(function(d) { return x(d.x); })
- .y(function(d) { return y(d.y); });
-
- console.log(batchTime);
- var batchTimeData = [
- {x: minX, y: batchTime}, {x: maxX, y: batchTime}
- ];
- console.log(batchTimeData);
-
- svg.append("path")
- .datum(batchTimeData)
- .style("stroke-dasharray", ("6, 6"))
- .style("stroke", "lightblue")
- .attr("class", "line")
- .attr("d", batchTimeline);
+ if (batchInterval && batchInterval <= maxY) {
+ drawLine(svg, x, y, minX, batchInterval, maxX, batchInterval);
}
svg.append("path")
@@ -131,87 +142,69 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchTime) {
svg.selectAll(".point")
.data(data)
.enter().append("circle")
- .attr("stroke", "white") // white and opacity = 0 make it invisible
+ .attr("stroke", "white") // white and opacity = 0 make it invisible
+ .attr("fill", "white")
+ .attr("opacity", "0")
+ .attr("cx", function(d) { return x(d.x); })
+ .attr("cy", function(d) { return y(d.y); })
+ .attr("r", function(d) { return 3; })
+ .on('mouseover', function(d) {
+ var tip = formatYValue(d.y) + " " + unitY + " at " + timeFormat[d.x];
+ showBootstrapTooltip(d3.select(this).node(), tip);
+ // show the point
+ d3.select(this)
+ .attr("stroke", "steelblue")
+ .attr("fill", "steelblue")
+ .attr("opacity", "1");
+ })
+ .on('mouseout', function() {
+ hideBootstrapTooltip(d3.select(this).node());
+ // hide the point
+ d3.select(this)
+ .attr("stroke", "white")
.attr("fill", "white")
- .attr("opacity", "0")
- .attr("cx", function(d) { return x(d.x); })
- .attr("cy", function(d) { return y(d.y); })
- .attr("r", function(d) { return 3; })
- .on('mouseover', function(d) {
- var tip = formatYValue(d.y) + " " + unitY + " at " + timeFormat[d.x];
- showBootstrapTooltip(d3.select(this).node(), tip);
- //showGraphTooltip(tip, d3.event.pageX + 5, d3.event.pageY - 25);
- // show the point
- d3.select(this)
- .attr("stroke", "steelblue")
- .attr("fill", "steelblue")
- .attr("opacity", "1");
- })
- .on('mouseout', function() {
- hideBootstrapTooltip(d3.select(this).node());
- //hideGraphTooltip();
- // hide the point
- d3.select(this)
- .attr("stroke", "white")
- .attr("fill", "white")
- .attr("opacity", "0");
- });
-
+ .attr("opacity", "0");
+ });
}
/**
* @param id the `id` used in the html `div` tag
- * @param data the data for the distribution graph
+ * @param values the data for the histogram graph
* @param minY the min value of Y axis
* @param maxY the max value of Y axis
* @param unitY the unit of Y axis
+ * @param batchInterval if "batchInterval" is specified, we will draw a line for "batchInterval" in the graph
*/
-function drawDistribution(id, values, minY, maxY, unitY, batchTime) {
- d3.select(d3.select(id).node().parentNode).style("padding", "8px 8px 8px 0").style("border-left", "0px solid white");
+function drawHistogram(id, values, minY, maxY, unitY, batchInterval) {
+ // Hide the left border of " | ". We cannot use "css" directly, or "sorttable.js" will override them.
+ d3.select(d3.select(id).node().parentNode)
+ .style("padding", "8px 8px 8px 0")
+ .style("border-left", "0px solid white");
+
var margin = {top: 20, right: 30, bottom: 30, left: 10};
var width = 300 - margin.left - margin.right;
var height = 150 - margin.top - margin.bottom;
- var formatBinValue = d3.format(",.2f");
-
+ var x = d3.scale.linear().domain([0, maxXForHistogram]).range([0, width]);
var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]);
- var data = d3.layout.histogram().range([minY, maxY]).bins(binCount)(values);
-
- var x = d3.scale.linear()
- .domain([distributionMinX, distributionMaxX])
- .range([0, width]);
var xAxis = d3.svg.axis().scale(x).orient("top").ticks(5);
var yAxis = d3.svg.axis().scale(y).orient("left").ticks(0).tickFormat(function(d) { return ""; });
+ var data = d3.layout.histogram().range([minY, maxY]).bins(histogramBinCount)(values);
+
var svg = d3.select(id).append("svg")
.attr("width", width + margin.left + margin.right)
.attr("height", height + margin.top + margin.bottom)
.append("g")
.attr("transform", "translate(" + margin.left + "," + margin.top + ")");
- if (batchTime) {
- var batchTimeline = d3.svg.line()
- .x(function(d) { return x(d.x); })
- .y(function(d) { return y(d.y); });
-
- console.log(batchTime);
- var batchTimeData = [
- {x: distributionMinX, y: batchTime}, {x: distributionMaxX, y: batchTime}
- ];
- console.log(batchTimeData);
-
- svg.append("path")
- .datum(batchTimeData)
- .style("stroke-dasharray", ("6, 6"))
- .style("stroke", "lightblue")
- .attr("class", "line")
- .attr("d", batchTimeline);
+ if (batchInterval && batchInterval <= maxY) {
+ drawLine(svg, x, y, 0, batchInterval, maxXForHistogram, batchInterval);
}
svg.append("g")
.attr("class", "x axis")
- .attr("transform", "translate(0," + 0 + ")")
.call(xAxis)
svg.append("g")
@@ -220,82 +213,58 @@ function drawDistribution(id, values, minY, maxY, unitY, batchTime) {
var bar = svg.selectAll(".bar")
.data(data)
- .enter().append("g")
- .attr("transform", function(d) { return "translate(0," + (y(d.x) - height + y(d.dx)) + ")";})
- .attr("class", "bar").append("rect")
- .attr("width", function(d) { return x(d.y); })
- .attr("height", function(d) { return height - y(d.dx); })
- .on('mouseover', function(d) {
- var tip = d.y + " between " + formatBinValue(d.x) + " and " + formatBinValue(d.x + d.dx) + " " + unitY;
- showBootstrapTooltip(d3.select(this).node(), tip);
-
- // Calculate the location for tip
- var scrollTop = document.documentElement.scrollTop || document.body.scrollTop;
- var scrollLeft = document.documentElement.scrollLeft || document.body.scrollLeft;
- var target = d3.event.target;
- var matrix = target.getScreenCTM();
- var targetBBox = target.getBBox();
- var point = svg.node().ownerSVGElement.createSVGPoint();
- point.x = targetBBox.x;
- point.y = targetBBox.y;
- var bbox = point.matrixTransform(matrix);
- var tipX = bbox.x + scrollLeft;
- var tipY = bbox.y + scrollTop + 15;
-
- //showGraphTooltip(tip, tipX, tipY);
- })
- .on('mouseout', function() {
- hideBootstrapTooltip(d3.select(this).node());
- //hideGraphTooltip();
- });
-
- if (batchTime && batchTime <= maxY) {
+ .enter()
+ .append("g")
+ .attr("transform", function(d) { return "translate(0," + (y(d.x) - height + y(d.dx)) + ")";})
+ .attr("class", "bar").append("rect")
+ .attr("width", function(d) { return x(d.y); })
+ .attr("height", function(d) { return height - y(d.dx); })
+ .on('mouseover', function(d) {
+ var percent = yValueFormat(d.y * 100.0 / values.length) + "%";
+ var tip = d.y + " batches (" + percent + ") between " + yValueFormat(d.x) + " and " + yValueFormat(d.x + d.dx) + " " + unitY;
+ showBootstrapTooltip(d3.select(this).node(), tip);
+ })
+ .on('mouseout', function() {
+ hideBootstrapTooltip(d3.select(this).node());
+ });
+
+ if (batchInterval && batchInterval <= maxY) {
+ // Add the "stable" text to the graph below the batch interval line.
+ var stableXOffset = x(maxXForHistogram) - 20;
+ var stableYOffset = y(batchInterval) + 15;
svg.append("text")
- .style("fill", "lightblue")
- .attr("class", "stable-text")
- .attr("text-anchor", "middle")
- .attr("transform", "translate(" + (x(distributionMaxX)-20) +"," + (y(batchTime) + 15) + ")")
- .text("stable")
- .on('mouseover', function(d) {
- var tip = "Processing Time <= Batch Interval (" + formatBinValue(batchTime) +" " + unitY +")";
- showBootstrapTooltip(d3.select(this).node(), tip);
- })
- .on('mouseout', function() {
- hideBootstrapTooltip(d3.select(this).node());
- });
+ .style("fill", "lightblue")
+ .attr("class", "stable-text")
+ .attr("text-anchor", "middle")
+ .attr("transform", "translate(" + stableXOffset + "," + stableYOffset + ")")
+ .text("stable")
+ .on('mouseover', function(d) {
+ var tip = "Processing Time <= Batch Interval (" + yValueFormat(batchInterval) +" " + unitY +")";
+ showBootstrapTooltip(d3.select(this).node(), tip);
+ })
+ .on('mouseout', function() {
+ hideBootstrapTooltip(d3.select(this).node());
+ });
}
}
-function prepareTimeline(minY, maxY) {
- var formatYValue = d3.format(",.2f");
- var numOfChars = formatYValue(maxY).length;
- var maxPx = numOfChars * 8 + 10;
- // Make sure we have enough space to show the ticks in the y axis of timeline
- timelineMarginLeft = maxPx > timelineMarginLeft? maxPx : timelineMarginLeft;
-}
-
-function prepareDistribution(values, minY, maxY) {
- var data = d3.layout.histogram().range([minY, maxY]).bins(binCount)(values);
- var maxBarSize = d3.max(data, function(d) { return d.y; });
- distributionMaxX = maxBarSize > distributionMaxX? maxBarSize : distributionMaxX;
-}
-
-function getUrlParameter(sParam)
-{
- var sPageURL = window.location.search.substring(1);
- var sURLVariables = sPageURL.split('&');
- for (var i = 0; i < sURLVariables.length; i++)
+$(function() {
+ function getParameterFromURL(param)
{
- var sParameterName = sURLVariables[i].split('=');
- if (sParameterName[0] == sParam)
+ var parameters = window.location.search.substring(1); // Remove "?"
+ var keyValues = parameters.split('&');
+ for (var i = 0; i < keyValues.length; i++)
{
- return sParameterName[1];
+ var paramKeyValue = keyValues[i].split('=');
+ if (paramKeyValue[0] == param)
+ {
+ return paramKeyValue[1];
+ }
}
}
-}
-$(function() {
- if (getUrlParameter("show-receivers-detail") == "true") {
+ if (getParameterFromURL("show-receivers-detail") == "true") {
+ // Show the details for all receivers
$('#inputs-table').toggle('collapsed');
$('#triangle').html('▼');
}
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index db0836533388b..1ad4f48506fda 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -49,8 +49,8 @@ private[spark] object UIUtils extends Logging {
}
/**
- * Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value after converting with
- * the `TimeUnit`.
+ * Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value
+ * after converting, also with its TimeUnit.
*/
def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = {
if (milliseconds < 1000) {
@@ -65,7 +65,11 @@ private[spark] object UIUtils extends Logging {
return (minutes, TimeUnit.MINUTES)
}
val hours = minutes / 60
- (hours, TimeUnit.HOURS)
+ if (hours < 24) {
+ return (hours, TimeUnit.HOURS)
+ }
+ val days = hours / 24
+ (days, TimeUnit.DAYS)
}
def formatDate(date: Date): String = dateFormat.get.format(date)
diff --git a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
new file mode 100644
index 0000000000000..636b2c389e05e
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.ui
+
+import java.util.concurrent.TimeUnit
+
+import org.scalatest.FunSuite
+import org.scalatest.Matchers
+
+class UIUtilsSuite extends FunSuite with Matchers{
+
+ test("shortTimeUnitString") {
+ assert("ns" === UIUtils.shortTimeUnitString(TimeUnit.NANOSECONDS))
+ assert("us" === UIUtils.shortTimeUnitString(TimeUnit.MICROSECONDS))
+ assert("ms" === UIUtils.shortTimeUnitString(TimeUnit.MILLISECONDS))
+ assert("sec" === UIUtils.shortTimeUnitString(TimeUnit.SECONDS))
+ assert("min" === UIUtils.shortTimeUnitString(TimeUnit.MINUTES))
+ assert("hrs" === UIUtils.shortTimeUnitString(TimeUnit.HOURS))
+ assert("days" === UIUtils.shortTimeUnitString(TimeUnit.DAYS))
+ }
+
+ test("normalizeDuration") {
+ verifyNormalizedTime(900, TimeUnit.MILLISECONDS, 900)
+ verifyNormalizedTime(1.0, TimeUnit.SECONDS, 1000)
+ verifyNormalizedTime(1.0, TimeUnit.MINUTES, 60 * 1000)
+ verifyNormalizedTime(1.0, TimeUnit.HOURS, 60 * 60 * 1000)
+ verifyNormalizedTime(1.0, TimeUnit.DAYS, 24 * 60 * 60 * 1000)
+ }
+
+ private def verifyNormalizedTime(
+ expectedTime: Double, expectedUnit: TimeUnit, input: Long): Unit = {
+ val (time, unit) = UIUtils.normalizeDuration(input)
+ time should be (expectedTime +- 1E-6)
+ unit should be (expectedUnit)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
index d7e39c528c519..5bf2097333fbe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
@@ -32,6 +32,7 @@ case class ReceiverInfo(
active: Boolean,
location: String,
lastErrorMessage: String = "",
- lastError: String = ""
+ lastError: String = "",
+ lastErrorTime: Long = -1L
) {
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 98900473138fe..a462298fe3345 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -155,10 +155,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
private def deregisterReceiver(streamId: Int, message: String, error: String) {
val newReceiverInfo = receiverInfo.get(streamId) match {
case Some(oldInfo) =>
- oldInfo.copy(actor = null, active = false, lastErrorMessage = message, lastError = error)
+ oldInfo.copy(actor = null, active = false, lastErrorMessage = message,
+ lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
case None =>
logWarning("No prior receiver info")
- ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
+ ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message,
+ lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
}
receiverInfo -= streamId
listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
@@ -182,7 +184,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
oldInfo.copy(lastErrorMessage = message, lastError = error)
case None =>
logWarning("No prior receiver info")
- ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
+ ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message,
+ lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
}
receiverInfo(streamId) = newReceiverInfo
listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId)))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index df1c0a10704c3..61fa86838861c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -27,7 +27,7 @@ private[ui] abstract class BatchTableBase(tableId: String) {
protected def columns: Seq[Node] = {
| Batch Time |
Input Size |
- Scheduling Delay |
+ Streaming Scheduling Delay |
Processing Time |
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 0b5402a6d9fce..682a9e05fec6d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -25,7 +25,6 @@ import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
-import org.apache.spark.util.Distribution
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
@@ -39,7 +38,6 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private var totalReceivedRecords = 0L
private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
- private val receiverLastErrorTime = new HashMap[Int, Long]
val batchDuration = ssc.graph.batchDuration.milliseconds
@@ -52,7 +50,6 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
override def onReceiverError(receiverError: StreamingListenerReceiverError) {
synchronized {
receiverInfos(receiverError.receiverInfo.streamId) = receiverError.receiverInfo
- receiverLastErrorTime(receiverError.receiverInfo.streamId) = System.currentTimeMillis()
}
}
@@ -123,32 +120,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
completedBatchInfos.toSeq
}
- def processingDelayDistribution: Option[Distribution] = synchronized {
- extractDistribution(_.processingDelay)
- }
-
- def schedulingDelayDistribution: Option[Distribution] = synchronized {
- extractDistribution(_.schedulingDelay)
- }
-
- def totalDelayDistribution: Option[Distribution] = synchronized {
- extractDistribution(_.totalDelay)
- }
-
- def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
- val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit)
- val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
- (0 until numReceivers).map { receiverId =>
- val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
- batchInfo.get(receiverId).getOrElse(Array.empty)
- }
- val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
- // calculate records per second for each batch
- blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
- }
- val distributionOption = Distribution(recordsOfParticularReceiver)
- (receiverId, distributionOption)
- }.toMap
+ def allReceivers: Seq[Int] = synchronized {
+ receiverInfos.keys.toSeq
}
def receivedRecordsWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
@@ -184,10 +157,6 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
receiverInfos.get(receiverId)
}
- def receiverLastErrorTime(receiverId: Int): Option[Long] = synchronized {
- receiverLastErrorTime.get(receiverId)
- }
-
def lastCompletedBatch: Option[BatchInfo] = synchronized {
completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
}
@@ -200,8 +169,4 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
(waitingBatchInfos.values.toSeq ++
runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
}
-
- private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
- Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
- }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 4ff77e6c09141..f69fdb010c007 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -28,7 +28,6 @@ import scala.xml.{Node, Unparsed}
import org.apache.spark.Logging
import org.apache.spark.ui._
import org.apache.spark.ui.UIUtils._
-import org.apache.spark.util.Distribution
/**
* @param divId the `id` used in the html `div` tag
@@ -38,18 +37,22 @@ import org.apache.spark.util.Distribution
* @param minY the min value of Y axis
* @param maxY the max value of Y axis
* @param unitY the unit of Y axis
+ * @param batchInterval if `batchInterval` is not None, we will draw a line for `batchInterval` in
+ * the graph
*/
private[ui] class TimelineUIData(divId: String, data: Seq[(Long, _)], minX: Long, maxX: Long,
- minY: Double, maxY: Double, unitY: String, batchTime: Option[Double] = None) {
+ minY: Double, maxY: Double, unitY: String, batchInterval: Option[Double] = None) {
def toHtml(jsCollector: JsCollector): Seq[Node] = {
val jsForData = data.map { case (x, y) =>
s"""{"x": $x, "y": $y}"""
}.mkString("[", ",", "]")
- jsCollector.addPreparedStatement(s"prepareTimeline($minY, $maxY);")
- if (batchTime.isDefined) {
+ jsCollector.addPreparedStatement(s"registerTimeline($minY, $maxY);")
+ if (batchInterval.isDefined) {
jsCollector.addStatement(
- s"drawTimeline('#$divId', $jsForData, $minX, $maxX, $minY, $maxY, '$unitY', ${batchTime.get});")
+ "drawTimeline(" +
+ s"'#$divId', $jsForData, $minX, $maxX, $minY, $maxY, '$unitY', ${batchInterval.get}" +
+ ");")
} else {
jsCollector.addStatement(
s"drawTimeline('#$divId', $jsForData, $minX, $maxX, $minY, $maxY, '$unitY');")
@@ -60,31 +63,48 @@ private[ui] class TimelineUIData(divId: String, data: Seq[(Long, _)], minX: Long
/**
* @param divId the `id` used in the html `div` tag
- * @param data the data for the distribution graph
+ * @param data the data for the histogram graph
* @param minY the min value of Y axis
* @param maxY the max value of Y axis
* @param unitY the unit of Y axis
+ * @param batchInterval if `batchInterval` is not None, we will draw a line for `batchInterval` in
+ * the graph
*/
-private[ui] class DistributionUIData(
- divId: String, data: Seq[_], minY: Double, maxY: Double, unitY: String, batchTime: Option[Double] = None) {
+private[ui] class HistogramUIData(
+ divId: String, data: Seq[_], minY: Double, maxY: Double, unitY: String,
+ batchInterval: Option[Double] = None) {
def toHtml(jsCollector: JsCollector): Seq[Node] = {
val jsForData = data.mkString("[", ",", "]")
- jsCollector.addPreparedStatement(s"prepareDistribution($jsForData, $minY, $maxY);")
- if (batchTime.isDefined) {
- jsCollector.addStatement(s"drawDistribution('#$divId', $jsForData, $minY, $maxY, '$unitY', ${batchTime.get});")
+ jsCollector.addPreparedStatement(s"registerHistogram($jsForData, $minY, $maxY);")
+ if (batchInterval.isDefined) {
+ jsCollector.addStatement(
+ "drawHistogram(" +
+ s"'#$divId', $jsForData, $minY, $maxY, '$unitY', ${batchInterval.get}" +
+ ");")
} else {
- jsCollector.addStatement(s"drawDistribution('#$divId', $jsForData, $minY, $maxY, '$unitY');")
+ jsCollector.addStatement(s"drawHistogram('#$divId', $jsForData, $minY, $maxY, '$unitY');")
}
}
}
+/**
+ * @param data (batchTime, milliseconds). "milliseconds" is something like "processing time".
+ */
private[ui] class MillisecondsStatUIData(data: Seq[(Long, Long)]) {
- def timelineData(unit: TimeUnit) = data.map(x => x._1 -> StreamingPage.convertToTimeUnit(x._2, unit))
+ /**
+ * Converting the original data as per `unit`.
+ */
+ def timelineData(unit: TimeUnit): Seq[(Long, Double)] =
+ data.map(x => x._1 -> StreamingPage.convertToTimeUnit(x._2, unit))
- def distributionData(unit: TimeUnit) = data.map(x => StreamingPage.convertToTimeUnit(x._2, unit))
+ /**
+ * Converting the original data as per `unit`.
+ */
+ def histogramData(unit: TimeUnit): Seq[Double] =
+ data.map(x => StreamingPage.convertToTimeUnit(x._2, unit))
val avg: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)
@@ -93,7 +113,10 @@ private[ui] class MillisecondsStatUIData(data: Seq[(Long, Long)]) {
val max: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).max)
}
-private[ui] class DoubleStatUIData(val data: Seq[(Long, Double)]) {
+/**
+ * @param data (batchTime, event-rate).
+ */
+private[ui] class EventRateUIData(val data: Seq[(Long, Double)]) {
val avg: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)
@@ -154,6 +177,14 @@ private[ui] class StreamingPage(parent: StreamingTab)
}
+ /**
+ * Generate a global "timeFormat" dictionary in the JavaScript to store the time and its formatted
+ * string. Because we cannot specify a timezone in JavaScript, to make sure the server and client
+ * use the same timezone, we use the "timeFormat" dictionary to format all time values used in the
+ * graphs.
+ *
+ * @param times all time values that will be used in the graphs.
+ */
private def generateTimeMap(times: Seq[Long]): Seq[Node] = {
val dateFormat = new SimpleDateFormat("HH:mm:ss")
val js = "var timeFormat = {};\n" + times.map { time =>
@@ -165,28 +196,26 @@ private[ui] class StreamingPage(parent: StreamingTab)
}
private def generateStatTable(): Seq[Node] = {
- val batchInfos = listener.retainedBatches
+ val batches = listener.retainedBatches
- val batchTimes = batchInfos.map(_.batchTime.milliseconds)
+ val batchTimes = batches.map(_.batchTime.milliseconds)
val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min
val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max
- val eventRateForAllReceivers = new DoubleStatUIData(batchInfos.map { batchInfo =>
+ val eventRateForAllReceivers = new EventRateUIData(batches.map { batchInfo =>
(batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration)
})
- val schedulingDelay = new MillisecondsStatUIData(batchInfos.flatMap { batchInfo =>
+ val schedulingDelay = new MillisecondsStatUIData(batches.flatMap { batchInfo =>
batchInfo.schedulingDelay.map(batchInfo.batchTime.milliseconds -> _)
})
- val processingTime = new MillisecondsStatUIData(batchInfos.flatMap { batchInfo =>
+ val processingTime = new MillisecondsStatUIData(batches.flatMap { batchInfo =>
batchInfo.processingDelay.map(batchInfo.batchTime.milliseconds -> _)
})
- val totalDelay = new MillisecondsStatUIData(batchInfos.flatMap { batchInfo =>
+ val totalDelay = new MillisecondsStatUIData(batches.flatMap { batchInfo =>
batchInfo.totalDelay.map(batchInfo.batchTime.milliseconds -> _)
})
- val jsCollector = new JsCollector
-
// Use the max value of "schedulingDelay", "processingTime", and "totalDelay" to make the
// Y axis ranges same.
val _maxTime =
@@ -194,8 +223,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
m1 max m2 max m3).getOrElse(0L)
// Should start at 0
val minTime = 0L
- val (maxTime, unit) = UIUtils.normalizeDuration(_maxTime)
- val formattedUnit = UIUtils.shortTimeUnitString(unit)
+ val (maxTime, normalizedUnit) = UIUtils.normalizeDuration(_maxTime)
+ val formattedUnit = UIUtils.shortTimeUnitString(normalizedUnit)
// Use the max input rate for all receivers' graphs to make the Y axis ranges same.
// If it's not an integral number, just use its ceil integral number.
@@ -209,9 +238,13 @@ private[ui] class StreamingPage(parent: StreamingTab)
|if ($$(this).html() == '$BLACK_RIGHT_TRIANGLE_HTML') {
|$$(this).html('$BLACK_DOWN_TRIANGLE_HTML');status = true;}
|else {$$(this).html('$BLACK_RIGHT_TRIANGLE_HTML');status = false;}
- |window.history.pushState('', document.title, window.location.pathname + '?show-receivers-detail=' + status);""".stripMargin.replaceAll("\\n", "")
+ |window.history.pushState('',
+ | document.title, window.location.pathname + '?show-receivers-detail=' + status);"""
+ .stripMargin.replaceAll("\\n", "") // it must be only one single line
- var batchTime = StreamingPage.convertToTimeUnit(listener.batchDuration, unit)
+ val batchInterval = StreamingPage.convertToTimeUnit(listener.batchDuration, normalizedUnit)
+
+ val jsCollector = new JsCollector
val timelineDataForEventRateOfAllReceivers =
new TimelineUIData(
@@ -223,9 +256,9 @@ private[ui] class StreamingPage(parent: StreamingTab)
maxEventRate,
"events/sec").toHtml(jsCollector)
- val distributionDataForEventRateOfAllReceivers =
- new DistributionUIData(
- "all-receiver-events-distribution",
+ val histogramDataForEventRateOfAllReceivers =
+ new HistogramUIData(
+ "all-receiver-events-histogram",
eventRateForAllReceivers.data.map(_._2),
minEventRate,
maxEventRate,
@@ -234,17 +267,17 @@ private[ui] class StreamingPage(parent: StreamingTab)
val timelineDataForSchedulingDelay =
new TimelineUIData(
"scheduling-delay-timeline",
- schedulingDelay.timelineData(unit),
+ schedulingDelay.timelineData(normalizedUnit),
minBatchTime,
maxBatchTime,
minTime,
maxTime,
formattedUnit).toHtml(jsCollector)
- val distributionDataForSchedulingDelay =
- new DistributionUIData(
- "scheduling-delay-distribution",
- schedulingDelay.distributionData(unit),
+ val histogramDataForSchedulingDelay =
+ new HistogramUIData(
+ "scheduling-delay-histogram",
+ schedulingDelay.histogramData(normalizedUnit),
minTime,
maxTime,
formattedUnit).toHtml(jsCollector)
@@ -252,58 +285,63 @@ private[ui] class StreamingPage(parent: StreamingTab)
val timelineDataForProcessingTime =
new TimelineUIData(
"processing-time-timeline",
- processingTime.timelineData(unit),
+ processingTime.timelineData(normalizedUnit),
minBatchTime,
maxBatchTime,
minTime,
maxTime,
- formattedUnit, Some(batchTime)).toHtml(jsCollector)
+ formattedUnit, Some(batchInterval)).toHtml(jsCollector)
- val distributionDataForProcessingTime =
- new DistributionUIData(
- "processing-time-distribution",
- processingTime.distributionData(unit),
+ val histogramDataForProcessingTime =
+ new HistogramUIData(
+ "processing-time-histogram",
+ processingTime.histogramData(normalizedUnit),
minTime,
maxTime,
- formattedUnit, Some(batchTime)).toHtml(jsCollector)
+ formattedUnit, Some(batchInterval)).toHtml(jsCollector)
val timelineDataForTotalDelay =
new TimelineUIData(
"total-delay-timeline",
- totalDelay.timelineData(unit),
+ totalDelay.timelineData(normalizedUnit),
minBatchTime,
maxBatchTime,
minTime,
maxTime,
formattedUnit).toHtml(jsCollector)
- val distributionDataForTotalDelay =
- new DistributionUIData(
- "total-delay-distribution",
- totalDelay.distributionData(unit),
+ val histogramDataForTotalDelay =
+ new HistogramUIData(
+ "total-delay-histogram",
+ totalDelay.histogramData(normalizedUnit),
minTime,
maxTime,
formattedUnit).toHtml(jsCollector)
+ val numCompletedBatches = listener.retainedCompletedBatches.size
+ val numActiveBatches = batchTimes.length - numCompletedBatches
val table =
// scalastyle:off
- | Timelines | Histograms |
+
+ |
+ Timelines (Last {batchTimes.length} batches, {numActiveBatches} active, {numCompletedBatches} completed) |
+ Histograms |
-
+ |
-
- {Unparsed(BLACK_RIGHT_TRIANGLE_HTML)}
- Input Rate
-
- Avg: {eventRateForAllReceivers.formattedAvg} events/sec
+
+ {Unparsed(BLACK_RIGHT_TRIANGLE_HTML)}
+ Input Rate
+
+ Avg: {eventRateForAllReceivers.formattedAvg} events/sec
|
{timelineDataForEventRateOfAllReceivers} |
- {distributionDataForEventRateOfAllReceivers} |
+ {histogramDataForEventRateOfAllReceivers} |
@@ -312,27 +350,33 @@ private[ui] class StreamingPage(parent: StreamingTab)
|
- Scheduling Delay
- Avg: {schedulingDelay.formattedAvg}
+
+ Streaming Scheduling Delay
+ Avg: {schedulingDelay.formattedAvg}
+
|
{timelineDataForSchedulingDelay} |
- {distributionDataForSchedulingDelay} |
+ {histogramDataForSchedulingDelay} |
- Processing Time
- Avg: {processingTime.formattedAvg}
+
+ Processing Time
+ Avg: {processingTime.formattedAvg}
+
|
{timelineDataForProcessingTime} |
- {distributionDataForProcessingTime} |
+ {histogramDataForProcessingTime} |
- Total Delay
- Avg: {totalDelay.formattedAvg}
+
+ Total Delay
+ Avg: {totalDelay.formattedAvg}
+
|
{timelineDataForTotalDelay} |
- {distributionDataForTotalDelay} |
+ {histogramDataForTotalDelay} |
@@ -347,10 +391,11 @@ private[ui] class StreamingPage(parent: StreamingTab)
maxX: Long,
minY: Double,
maxY: Double): Seq[Node] = {
- val content = listener.receivedRecordsDistributions.map { case (receiverId, distribution) =>
- generateInputReceiverRow(jsCollector, receiverId, distribution, minX, maxX, minY, maxY)
+ val content = listener.allReceivers.map { receiverId =>
+ generateInputReceiverRow(jsCollector, receiverId, minX, maxX, minY, maxY)
}.foldLeft[Seq[Node]](Nil)(_ ++ _)
+ // scalastyle:off
@@ -365,17 +410,16 @@ private[ui] class StreamingPage(parent: StreamingTab)
{content}
+ // scalastyle:on
}
private def generateInputReceiverRow(
jsCollector: JsCollector,
receiverId: Int,
- distribution: Option[Distribution],
minX: Long,
maxX: Long,
minY: Double,
maxY: Double): Seq[Node] = {
- val avgReceiverEvents = distribution.map(_.statCounter.mean.toLong)
val receiverInfo = listener.receiverInfo(receiverId)
val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId")
val receiverActive = receiverInfo.map { info =>
@@ -386,9 +430,11 @@ private[ui] class StreamingPage(parent: StreamingTab)
val msg = s"${info.lastErrorMessage} - ${info.lastError}"
if (msg.size > 100) msg.take(97) + "..." else msg
}.getOrElse(emptyCell)
- val receiverLastErrorTime =
- listener.receiverLastErrorTime(receiverId).map(UIUtils.formatDate).getOrElse(emptyCell)
- val receivedRecords = new DoubleStatUIData(listener.receivedRecordsWithBatchTime.get(receiverId).getOrElse(Seq()))
+ val receiverLastErrorTime = receiverInfo.map {
+ r => if (r.lastErrorTime < 0) "-" else UIUtils.formatDate(r.lastErrorTime)
+ }.getOrElse(emptyCell)
+ val receivedRecords =
+ new EventRateUIData(listener.receivedRecordsWithBatchTime.get(receiverId).getOrElse(Seq()))
val timelineForEventRate =
new TimelineUIData(
@@ -400,9 +446,9 @@ private[ui] class StreamingPage(parent: StreamingTab)
maxY,
"events/sec").toHtml(jsCollector)
- val distributionForEventsRate =
- new DistributionUIData(
- s"receiver-$receiverId-events-distribution",
+ val histogramForEventsRate =
+ new HistogramUIData(
+ s"receiver-$receiverId-events-histogram",
receivedRecords.data.map(_._2),
minY,
maxY,
@@ -410,31 +456,22 @@ private[ui] class StreamingPage(parent: StreamingTab)
-
-
- {receiverName}
+
+ {receiverName}
+ Avg: {receivedRecords.formattedAvg} events/sec
- Avg: {receivedRecords.formattedAvg} events/sec
-
|
{receiverActive} |
{receiverLocation} |
{receiverLastErrorTime} |
{receiverLastError} |
-
-
- {timelineForEventRate}
- |
- {distributionForEventsRate} |
-
- }
-
- /**
- * Returns a human-readable string representing a duration such as "5 second 35 ms"
- */
- private def formatDurationOption(msOption: Option[Long]): String = {
- msOption.map(formatDurationVerbose).getOrElse(emptyCell)
+
+
+ {timelineForEventRate}
+ |
+ {histogramForEventsRate} |
+
}
private def generateBatchListTables(): Seq[Node] = {
@@ -472,9 +509,13 @@ private[ui] object StreamingPage {
msOption.map(formatDurationVerbose).getOrElse(emptyCell)
}
+ /**
+ * Convert `milliseconds` to the specified `unit`. We cannot use `TimeUnit.convert` because it
+ * will discard the fractional part.
+ */
def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match {
- case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000 // not used yet
- case TimeUnit.MICROSECONDS => milliseconds * 1000 // not used yet
+ case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000
+ case TimeUnit.MICROSECONDS => milliseconds * 1000
case TimeUnit.MILLISECONDS => milliseconds
case TimeUnit.SECONDS => milliseconds / 1000.0
case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index a2f4f3fe19897..84c10ae6dc462 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -80,10 +80,12 @@ class UISeleniumSuite
h4Text should contain("Completed Batches (last 0 out of 0)")
findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be {
- List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Status")
+ List("Batch Time", "Input Size", "Streaming Scheduling Delay", "Processing Time",
+ "Status")
}
findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
- List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Total Delay")
+ List("Batch Time", "Input Size", "Streaming Scheduling Delay", "Processing Time",
+ "Total Delay")
}
}