diff --git a/LICENSE b/LICENSE index 21c42e9a20fa3..b2001f029a4f0 100644 --- a/LICENSE +++ b/LICENSE @@ -643,6 +643,36 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +======================================================================== +For d3 (core/src/main/resources/org/apache/spark/ui/static/d3.min.js): +======================================================================== + +Copyright (c) 2010-2015, Michael Bostock +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* The name Michael Bostock may not be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL MICHAEL BOSTOCK BE LIABLE FOR ANY DIRECT, +INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ======================================================================== For Scala Interpreter classes (all .scala files in repl/src/main/scala diff --git a/core/src/main/resources/org/apache/spark/ui/static/bootstrap-tooltip.js b/core/src/main/resources/org/apache/spark/ui/static/bootstrap-tooltip.js index 2934181c1006a..acd6096e6743e 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/bootstrap-tooltip.js +++ b/core/src/main/resources/org/apache/spark/ui/static/bootstrap-tooltip.js @@ -1,9 +1,9 @@ /* =========================================================== - * bootstrap-tooltip.js v2.2.2 - * http://twitter.github.com/bootstrap/javascript.html#tooltips + * bootstrap-tooltip.js v2.3.2 + * http://getbootstrap.com/2.3.2/javascript.html#tooltips * Inspired by the original jQuery.tipsy by Jason Frame * =========================================================== - * Copyright 2012 Twitter, Inc. + * Copyright 2013 Twitter, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,19 +38,27 @@ , init: function (type, element, options) { var eventIn , eventOut + , triggers + , trigger + , i this.type = type this.$element = $(element) this.options = this.getOptions(options) this.enabled = true - if (this.options.trigger == 'click') { - this.$element.on('click.' + this.type, this.options.selector, $.proxy(this.toggle, this)) - } else if (this.options.trigger != 'manual') { - eventIn = this.options.trigger == 'hover' ? 'mouseenter' : 'focus' - eventOut = this.options.trigger == 'hover' ? 'mouseleave' : 'blur' - this.$element.on(eventIn + '.' + this.type, this.options.selector, $.proxy(this.enter, this)) - this.$element.on(eventOut + '.' + this.type, this.options.selector, $.proxy(this.leave, this)) + triggers = this.options.trigger.split(' ') + + for (i = triggers.length; i--;) { + trigger = triggers[i] + if (trigger == 'click') { + this.$element.on('click.' + this.type, this.options.selector, $.proxy(this.toggle, this)) + } else if (trigger != 'manual') { + eventIn = trigger == 'hover' ? 'mouseenter' : 'focus' + eventOut = trigger == 'hover' ? 'mouseleave' : 'blur' + this.$element.on(eventIn + '.' + this.type, this.options.selector, $.proxy(this.enter, this)) + this.$element.on(eventOut + '.' + this.type, this.options.selector, $.proxy(this.leave, this)) + } } this.options.selector ? @@ -59,7 +67,7 @@ } , getOptions: function (options) { - options = $.extend({}, $.fn[this.type].defaults, options, this.$element.data()) + options = $.extend({}, $.fn[this.type].defaults, this.$element.data(), options) if (options.delay && typeof options.delay == 'number') { options.delay = { @@ -72,7 +80,15 @@ } , enter: function (e) { - var self = $(e.currentTarget)[this.type](this._options).data(this.type) + var defaults = $.fn[this.type].defaults + , options = {} + , self + + this._options && $.each(this._options, function (key, value) { + if (defaults[key] != value) options[key] = value + }, this) + + self = $(e.currentTarget)[this.type](options).data(this.type) if (!self.options.delay || !self.options.delay.show) return self.show() @@ -97,14 +113,16 @@ , show: function () { var $tip - , inside , pos , actualWidth , actualHeight , placement , tp + , e = $.Event('show') if (this.hasContent() && this.enabled) { + this.$element.trigger(e) + if (e.isDefaultPrevented()) return $tip = this.tip() this.setContent() @@ -116,19 +134,18 @@ this.options.placement.call(this, $tip[0], this.$element[0]) : this.options.placement - inside = /in/.test(placement) - $tip .detach() .css({ top: 0, left: 0, display: 'block' }) - .insertAfter(this.$element) - pos = this.getPosition(inside) + this.options.container ? $tip.appendTo(this.options.container) : $tip.insertAfter(this.$element) + + pos = this.getPosition() actualWidth = $tip[0].offsetWidth actualHeight = $tip[0].offsetHeight - switch (inside ? placement.split(' ')[1] : placement) { + switch (placement) { case 'bottom': tp = {top: pos.top + pos.height, left: pos.left + pos.width / 2 - actualWidth / 2} break @@ -143,11 +160,56 @@ break } - $tip - .offset(tp) - .addClass(placement) - .addClass('in') + this.applyPlacement(tp, placement) + this.$element.trigger('shown') + } + } + + , applyPlacement: function(offset, placement){ + var $tip = this.tip() + , width = $tip[0].offsetWidth + , height = $tip[0].offsetHeight + , actualWidth + , actualHeight + , delta + , replace + + $tip + .offset(offset) + .addClass(placement) + .addClass('in') + + actualWidth = $tip[0].offsetWidth + actualHeight = $tip[0].offsetHeight + + if (placement == 'top' && actualHeight != height) { + offset.top = offset.top + height - actualHeight + replace = true + } + + if (placement == 'bottom' || placement == 'top') { + delta = 0 + + if (offset.left < 0){ + delta = offset.left * -2 + offset.left = 0 + $tip.offset(offset) + actualWidth = $tip[0].offsetWidth + actualHeight = $tip[0].offsetHeight + } + + this.replaceArrow(delta - width + actualWidth, actualWidth, 'left') + } else { + this.replaceArrow(actualHeight - height, actualHeight, 'top') } + + if (replace) $tip.offset(offset) + } + + , replaceArrow: function(delta, dimension, position){ + this + .arrow() + .css(position, delta ? (50 * (1 - delta / dimension) + "%") : '') } , setContent: function () { @@ -161,6 +223,10 @@ , hide: function () { var that = this , $tip = this.tip() + , e = $.Event('hide') + + this.$element.trigger(e) + if (e.isDefaultPrevented()) return $tip.removeClass('in') @@ -179,6 +245,8 @@ removeWithAnimation() : $tip.detach() + this.$element.trigger('hidden') + return this } @@ -193,11 +261,12 @@ return this.getTitle() } - , getPosition: function (inside) { - return $.extend({}, (inside ? {top: 0, left: 0} : this.$element.offset()), { - width: this.$element[0].offsetWidth - , height: this.$element[0].offsetHeight - }) + , getPosition: function () { + var el = this.$element[0] + return $.extend({}, (typeof el.getBoundingClientRect == 'function') ? el.getBoundingClientRect() : { + width: el.offsetWidth + , height: el.offsetHeight + }, this.$element.offset()) } , getTitle: function () { @@ -215,6 +284,10 @@ return this.$tip = this.$tip || $(this.options.template) } + , arrow: function(){ + return this.$arrow = this.$arrow || this.tip().find(".tooltip-arrow") + } + , validate: function () { if (!this.$element[0].parentNode) { this.hide() @@ -236,8 +309,8 @@ } , toggle: function (e) { - var self = $(e.currentTarget)[this.type](this._options).data(this.type) - self[self.tip().hasClass('in') ? 'hide' : 'show']() + var self = e ? $(e.currentTarget)[this.type](this._options).data(this.type) : this + self.tip().hasClass('in') ? self.hide() : self.show() } , destroy: function () { @@ -269,10 +342,11 @@ , placement: 'top' , selector: false , template: '
' - , trigger: 'hover' + , trigger: 'hover focus' , title: '' , delay: 0 , html: false + , container: false } @@ -285,4 +359,3 @@ } }(window.jQuery); - diff --git a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css new file mode 100644 index 0000000000000..5da9d631ad124 --- /dev/null +++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +.graph { + font: 10px sans-serif; +} + +.axis path, .axis line { + fill: none; + stroke: gray; + shape-rendering: crispEdges; +} + +.axis text { + fill: gray; +} + +.tooltip-inner { + max-width: 500px !important; // Make sure we only have one line tooltip +} + +.line { + fill: none; + stroke: #0088cc; + stroke-width: 1.5px; +} + +.bar rect { + fill: #0088cc; + shape-rendering: crispEdges; +} + +.bar rect:hover { + fill: #00c2ff; +} + +.timeline { + width: 500px; +} + +.histogram { + width: auto; +} diff --git a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js new file mode 100644 index 0000000000000..a4e03b156f13e --- /dev/null +++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +// timeFormat: StreamingPage.scala will generate a global "timeFormat" dictionary to store the time +// and its formatted string. Because we cannot specify a timezone in JavaScript, to make sure the +// server and client use the same timezone, we use the "timeFormat" dictionary to format all time +// values used in the graphs. + +// A global margin left for all timeline graphs. It will be set in "registerTimeline". This will be +// used to align all timeline graphs. +var maxMarginLeftForTimeline = 0; + +// The max X values for all histograms. It will be set in "registerHistogram". +var maxXForHistogram = 0; + +var histogramBinCount = 10; +var yValueFormat = d3.format(",.2f"); + +// Show a tooltip "text" for "node" +function showBootstrapTooltip(node, text) { + $(node).tooltip({title: text, trigger: "manual", container: "body"}); + $(node).tooltip("show"); +} + +// Hide the tooltip for "node" +function hideBootstrapTooltip(node) { + $(node).tooltip("destroy"); +} + +// Register a timeline graph. All timeline graphs should be register before calling any +// "drawTimeline" so that we can determine the max margin left for all timeline graphs. +function registerTimeline(minY, maxY) { + var numOfChars = yValueFormat(maxY).length; + // A least width for "maxY" in the graph + var pxForMaxY = numOfChars * 8 + 10; + // Make sure we have enough space to show the ticks in the y axis of timeline + maxMarginLeftForTimeline = pxForMaxY > maxMarginLeftForTimeline? pxForMaxY : maxMarginLeftForTimeline; +} + +// Register a histogram graph. All histogram graphs should be register before calling any +// "drawHistogram" so that we can determine the max X value for histograms. +function registerHistogram(values, minY, maxY) { + var data = d3.layout.histogram().range([minY, maxY]).bins(histogramBinCount)(values); + // d.x is the y values while d.y is the x values + var maxX = d3.max(data, function(d) { return d.y; }); + maxXForHistogram = maxX > maxXForHistogram ? maxX : maxXForHistogram; +} + +// Draw a line between (x1, y1) and (x2, y2) +function drawLine(svg, xFunc, yFunc, x1, y1, x2, y2) { + var line = d3.svg.line() + .x(function(d) { return xFunc(d.x); }) + .y(function(d) { return yFunc(d.y); }); + var data = [{x: x1, y: y1}, {x: x2, y: y2}]; + svg.append("path") + .datum(data) + .style("stroke-dasharray", ("6, 6")) + .style("stroke", "lightblue") + .attr("class", "line") + .attr("d", line); +} + +/** + * @param id the `id` used in the html `div` tag + * @param data the data for the timeline graph + * @param minX the min value of X axis + * @param maxX the max value of X axis + * @param minY the min value of Y axis + * @param maxY the max value of Y axis + * @param unitY the unit of Y axis + * @param batchInterval if "batchInterval" is specified, we will draw a line for "batchInterval" in the graph + */ +function drawTimeline(id, data, minX, maxX, minY, maxY, unitY, batchInterval) { + // Hide the right border of "". We cannot use "css" directly, or "sorttable.js" will override them. + d3.select(d3.select(id).node().parentNode) + .style("padding", "8px 0 8px 8px") + .style("border-right", "0px solid white"); + + var margin = {top: 20, right: 27, bottom: 30, left: maxMarginLeftForTimeline}; + var width = 500 - margin.left - margin.right; + var height = 150 - margin.top - margin.bottom; + + var x = d3.scale.linear().domain([minX, maxX]).range([0, width]); + var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]); + + var xAxis = d3.svg.axis().scale(x).orient("bottom").tickFormat(function(d) { return timeFormat[d]; }); + var formatYValue = d3.format(",.2f"); + var yAxis = d3.svg.axis().scale(y).orient("left").ticks(5).tickFormat(formatYValue); + + var line = d3.svg.line() + .x(function(d) { return x(d.x); }) + .y(function(d) { return y(d.y); }); + + var svg = d3.select(id).append("svg") + .attr("width", width + margin.left + margin.right) + .attr("height", height + margin.top + margin.bottom) + .append("g") + .attr("transform", "translate(" + margin.left + "," + margin.top + ")"); + + // Only show the first and last time in the graph + xAxis.tickValues(x.domain()); + + svg.append("g") + .attr("class", "x axis") + .attr("transform", "translate(0," + height + ")") + .call(xAxis) + + svg.append("g") + .attr("class", "y axis") + .call(yAxis) + .append("text") + .attr("transform", "translate(0," + (-3) + ")") + .text(unitY); + + + if (batchInterval && batchInterval <= maxY) { + drawLine(svg, x, y, minX, batchInterval, maxX, batchInterval); + } + + svg.append("path") + .datum(data) + .attr("class", "line") + .attr("d", line); + + // Add points to the line. However, we make it invisible at first. But when the user moves mouse + // over a point, it will be displayed with its detail. + svg.selectAll(".point") + .data(data) + .enter().append("circle") + .attr("stroke", "white") // white and opacity = 0 make it invisible + .attr("fill", "white") + .attr("opacity", "0") + .attr("cx", function(d) { return x(d.x); }) + .attr("cy", function(d) { return y(d.y); }) + .attr("r", function(d) { return 3; }) + .on('mouseover', function(d) { + var tip = formatYValue(d.y) + " " + unitY + " at " + timeFormat[d.x]; + showBootstrapTooltip(d3.select(this).node(), tip); + // show the point + d3.select(this) + .attr("stroke", "steelblue") + .attr("fill", "steelblue") + .attr("opacity", "1"); + }) + .on('mouseout', function() { + hideBootstrapTooltip(d3.select(this).node()); + // hide the point + d3.select(this) + .attr("stroke", "white") + .attr("fill", "white") + .attr("opacity", "0"); + }) + .on("click", function(d) { + window.location.href = "batch/?id=" + d.x; + }); +} + +/** + * @param id the `id` used in the html `div` tag + * @param values the data for the histogram graph + * @param minY the min value of Y axis + * @param maxY the max value of Y axis + * @param unitY the unit of Y axis + * @param batchInterval if "batchInterval" is specified, we will draw a line for "batchInterval" in the graph + */ +function drawHistogram(id, values, minY, maxY, unitY, batchInterval) { + // Hide the left border of "". We cannot use "css" directly, or "sorttable.js" will override them. + d3.select(d3.select(id).node().parentNode) + .style("padding", "8px 8px 8px 0") + .style("border-left", "0px solid white"); + + var margin = {top: 20, right: 30, bottom: 30, left: 10}; + var width = 300 - margin.left - margin.right; + var height = 150 - margin.top - margin.bottom; + + var x = d3.scale.linear().domain([0, maxXForHistogram]).range([0, width]); + var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]); + + var xAxis = d3.svg.axis().scale(x).orient("top").ticks(5); + var yAxis = d3.svg.axis().scale(y).orient("left").ticks(0).tickFormat(function(d) { return ""; }); + + var data = d3.layout.histogram().range([minY, maxY]).bins(histogramBinCount)(values); + + var svg = d3.select(id).append("svg") + .attr("width", width + margin.left + margin.right) + .attr("height", height + margin.top + margin.bottom) + .append("g") + .attr("transform", "translate(" + margin.left + "," + margin.top + ")"); + + if (batchInterval && batchInterval <= maxY) { + drawLine(svg, x, y, 0, batchInterval, maxXForHistogram, batchInterval); + } + + svg.append("g") + .attr("class", "x axis") + .call(xAxis) + + svg.append("g") + .attr("class", "y axis") + .call(yAxis) + + var bar = svg.selectAll(".bar") + .data(data) + .enter() + .append("g") + .attr("transform", function(d) { return "translate(0," + (y(d.x) - height + y(d.dx)) + ")";}) + .attr("class", "bar").append("rect") + .attr("width", function(d) { return x(d.y); }) + .attr("height", function(d) { return height - y(d.dx); }) + .on('mouseover', function(d) { + var percent = yValueFormat(d.y * 100.0 / values.length) + "%"; + var tip = d.y + " batches (" + percent + ") between " + yValueFormat(d.x) + " and " + yValueFormat(d.x + d.dx) + " " + unitY; + showBootstrapTooltip(d3.select(this).node(), tip); + }) + .on('mouseout', function() { + hideBootstrapTooltip(d3.select(this).node()); + }); + + if (batchInterval && batchInterval <= maxY) { + // Add the "stable" text to the graph below the batch interval line. + var stableXOffset = x(maxXForHistogram) - 20; + var stableYOffset = y(batchInterval) + 15; + svg.append("text") + .style("fill", "lightblue") + .attr("class", "stable-text") + .attr("text-anchor", "middle") + .attr("transform", "translate(" + stableXOffset + "," + stableYOffset + ")") + .text("stable") + .on('mouseover', function(d) { + var tip = "Processing Time <= Batch Interval (" + yValueFormat(batchInterval) +" " + unitY +")"; + showBootstrapTooltip(d3.select(this).node(), tip); + }) + .on('mouseout', function() { + hideBootstrapTooltip(d3.select(this).node()); + }); + } +} + +$(function() { + function getParameterFromURL(param) + { + var parameters = window.location.search.substring(1); // Remove "?" + var keyValues = parameters.split('&'); + for (var i = 0; i < keyValues.length; i++) + { + var paramKeyValue = keyValues[i].split('='); + if (paramKeyValue[0] == param) + { + return paramKeyValue[1]; + } + } + } + + if (getParameterFromURL("show-streams-detail") == "true") { + // Show the details for all InputDStream + $('#inputs-table').toggle('collapsed'); + $('#triangle').html('▼'); + } +}); diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 175140481e5ae..9c7f698840778 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -110,6 +110,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { .toArray } + def getInputStreamName(streamId: Int): Option[String] = synchronized { + inputStreams.find(_.id == streamId).map(_.name) + } + def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index e4ad4b509d8d8..9716adb62817c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -44,6 +44,11 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) /** This is an unique identifier for the input stream. */ val id = ssc.getNewInputStreamId() + /** + * The name of this InputDStream. By default, it's the class name with its id. + */ + private[streaming] def name: String = s"${getClass.getSimpleName}-$id" + /** * Checks whether the 'time' is valid wrt slideDuration for generating RDD. * Additionally it also ensures valid times are in strictly increasing order. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala index 52f08b9c9de68..de85f24dd988d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala @@ -32,6 +32,7 @@ case class ReceiverInfo( active: Boolean, location: String, lastErrorMessage: String = "", - lastError: String = "" + lastError: String = "", + lastErrorTime: Long = -1L ) { } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 3c341390eda39..f73f7e705ee0d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -155,10 +155,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false private def deregisterReceiver(streamId: Int, message: String, error: String) { val newReceiverInfo = receiverInfo.get(streamId) match { case Some(oldInfo) => - oldInfo.copy(endpoint = null, active = false, lastErrorMessage = message, lastError = error) + val lastErrorTime = + if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis() + oldInfo.copy(endpoint = null, active = false, lastErrorMessage = message, + lastError = error, lastErrorTime = lastErrorTime) case None => logWarning("No prior receiver info") - ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error) + val lastErrorTime = + if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis() + ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, + lastError = error, lastErrorTime = lastErrorTime) } receiverInfo -= streamId listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo)) @@ -182,7 +188,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false oldInfo.copy(lastErrorMessage = message, lastError = error) case None => logWarning("No prior receiver info") - ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error) + ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, + lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis()) } receiverInfo(streamId) = newReceiverInfo listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId))) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index e219e27785533..2960b528d4c5e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui import scala.xml.Node -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{UIUtils => SparkUIUtils} private[ui] abstract class BatchTableBase(tableId: String) { @@ -32,12 +32,12 @@ private[ui] abstract class BatchTableBase(tableId: String) { protected def baseRow(batch: BatchUIData): Seq[Node] = { val batchTime = batch.batchTime.milliseconds - val formattedBatchTime = UIUtils.formatDate(batch.batchTime.milliseconds) + val formattedBatchTime = SparkUIUtils.formatDate(batch.batchTime.milliseconds) val eventCount = batch.numRecords val schedulingDelay = batch.schedulingDelay - val formattedSchedulingDelay = schedulingDelay.map(UIUtils.formatDuration).getOrElse("-") + val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-") val processingTime = batch.processingDelay - val formattedProcessingTime = processingTime.map(UIUtils.formatDuration).getOrElse("-") + val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-") @@ -107,7 +107,7 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData]) private def completedBatchRow(batch: BatchUIData): Seq[Node] = { val totalDelay = batch.totalDelay - val formattedTotalDelay = totalDelay.map(UIUtils.formatDuration).getOrElse("-") + val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-") baseRow(batch) ++ {formattedTotalDelay} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index 2da9a29e2529e..3f1cab69068dc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -24,7 +24,7 @@ import scala.xml.{NodeSeq, Node} import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.streaming.Time -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId} import org.apache.spark.ui.jobs.UIData.JobUIData @@ -73,8 +73,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { sparkJob.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get). dropWhile(_.failureReason == None).take(1). // get the first info that contains failure flatMap(info => info.failureReason).headOption.getOrElse("") - val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-") - val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}" + val formattedDuration = duration.map(d => SparkUIUtils.formatDuration(d)).getOrElse("-") + val detailUrl = s"${SparkUIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}" // In the first row, output op id and its information needs to be shown. In other rows, these // cells will be taken up due to "rowspan". @@ -110,7 +110,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { { - UIUtils.makeProgressBar( + SparkUIUtils.makeProgressBar( started = sparkJob.numActiveTasks, completed = sparkJob.numCompletedTasks, failed = sparkJob.numFailedTasks, @@ -135,7 +135,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { // If any job does not finish, set "formattedOutputOpDuration" to "-" "-" } else { - UIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum) + SparkUIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum) } generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++ sparkJobs.tail.map { sparkJob => @@ -212,24 +212,24 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse { throw new IllegalArgumentException(s"Missing id parameter") } - val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds) + val formattedBatchTime = SparkUIUtils.formatDate(batchTime.milliseconds) val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse { throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist") } val formattedSchedulingDelay = - batchUIData.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-") + batchUIData.schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-") val formattedProcessingTime = - batchUIData.processingDelay.map(UIUtils.formatDuration).getOrElse("-") - val formattedTotalDelay = batchUIData.totalDelay.map(UIUtils.formatDuration).getOrElse("-") + batchUIData.processingDelay.map(SparkUIUtils.formatDuration).getOrElse("-") + val formattedTotalDelay = batchUIData.totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-") val summary: NodeSeq =