Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into unsafe_join
Browse files Browse the repository at this point in the history
Conflicts:
	sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala
  • Loading branch information
Davies Liu committed Jul 21, 2015
2 parents 0f4380d + 7f072c3 commit 68f5cd9
Show file tree
Hide file tree
Showing 27 changed files with 1,375 additions and 321 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,63 @@
* to be registered after the page loads. */
$(function() {
$("span.expand-additional-metrics").click(function(){
var status = window.localStorage.getItem("expand-additional-metrics") == "true";
status = !status;

// Expand the list of additional metrics.
var additionalMetricsDiv = $(this).parent().find('.additional-metrics');
$(additionalMetricsDiv).toggleClass('collapsed');

// Switch the class of the arrow from open to closed.
$(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-open');
$(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-closed');

window.localStorage.setItem("expand-additional-metrics", "" + status);
});

if (window.localStorage.getItem("expand-additional-metrics") == "true") {
// Set it to false so that the click function can revert it
window.localStorage.setItem("expand-additional-metrics", "false");
$("span.expand-additional-metrics").trigger("click");
}

stripeSummaryTable();

$('input[type="checkbox"]').click(function() {
var column = "table ." + $(this).attr("name");
var name = $(this).attr("name")
var column = "table ." + name;
var status = window.localStorage.getItem(name) == "true";
status = !status;
$(column).toggle();
stripeSummaryTable();
window.localStorage.setItem(name, "" + status);
});

$("#select-all-metrics").click(function() {
var status = window.localStorage.getItem("select-all-metrics") == "true";
status = !status;
if (this.checked) {
// Toggle all un-checked options.
$('input[type="checkbox"]:not(:checked)').trigger('click');
} else {
// Toggle all checked options.
$('input[type="checkbox"]:checked').trigger('click');
}
window.localStorage.setItem("select-all-metrics", "" + status);
});

if (window.localStorage.getItem("select-all-metrics") == "true") {
$("#select-all-metrics").attr('checked', status);
}

$("span.additional-metric-title").parent().find('input[type="checkbox"]').each(function() {
var name = $(this).attr("name")
// If name is undefined, then skip it because it's the "select-all-metrics" checkbox
if (name && window.localStorage.getItem(name) == "true") {
// Set it to false so that the click function can revert it
window.localStorage.setItem(name, "false");
$(this).trigger("click")
}
});

// Trigger a click on the checkbox if a user clicks the label next to it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,24 @@ var StagePageVizConstants = {
rankSep: 40
};

/*
* Return "expand-dag-viz-arrow-job" if forJob is true.
* Otherwise, return "expand-dag-viz-arrow-stage".
*/
function expandDagVizArrowKey(forJob) {
return forJob ? "expand-dag-viz-arrow-job" : "expand-dag-viz-arrow-stage";
}

/*
* Show or hide the RDD DAG visualization.
*
* The graph is only rendered the first time this is called.
* This is the narrow interface called from the Scala UI code.
*/
function toggleDagViz(forJob) {
var status = window.localStorage.getItem(expandDagVizArrowKey(forJob)) == "true";
status = !status;

var arrowSelector = ".expand-dag-viz-arrow";
$(arrowSelector).toggleClass('arrow-closed');
$(arrowSelector).toggleClass('arrow-open');
Expand All @@ -93,8 +104,24 @@ function toggleDagViz(forJob) {
// Save the graph for later so we don't have to render it again
graphContainer().style("display", "none");
}

window.localStorage.setItem(expandDagVizArrowKey(forJob), "" + status);
}

$(function (){
if (window.localStorage.getItem(expandDagVizArrowKey(false)) == "true") {
// Set it to false so that the click function can revert it
window.localStorage.setItem(expandDagVizArrowKey(false), "false");
toggleDagViz(false);
}

if (window.localStorage.getItem(expandDagVizArrowKey(true)) == "true") {
// Set it to false so that the click function can revert it
window.localStorage.setItem(expandDagVizArrowKey(true), "false");
toggleDagViz(true);
}
});

/*
* Render the RDD DAG visualization.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,27 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime) {
setupJobEventAction();

$("span.expand-application-timeline").click(function() {
var status = window.localStorage.getItem("expand-application-timeline") == "true";
status = !status;

$("#application-timeline").toggleClass('collapsed');

// Switch the class of the arrow from open to closed.
$(this).find('.expand-application-timeline-arrow').toggleClass('arrow-open');
$(this).find('.expand-application-timeline-arrow').toggleClass('arrow-closed');

window.localStorage.setItem("expand-application-timeline", "" + status);
});
}

$(function (){
if (window.localStorage.getItem("expand-application-timeline") == "true") {
// Set it to false so that the click function can revert it
window.localStorage.setItem("expand-application-timeline", "false");
$("span.expand-application-timeline").trigger('click');
}
});

function drawJobTimeline(groupArray, eventObjArray, startTime) {
var groups = new vis.DataSet(groupArray);
var items = new vis.DataSet(eventObjArray);
Expand Down Expand Up @@ -125,14 +138,27 @@ function drawJobTimeline(groupArray, eventObjArray, startTime) {
setupStageEventAction();

$("span.expand-job-timeline").click(function() {
var status = window.localStorage.getItem("expand-job-timeline") == "true";
status = !status;

$("#job-timeline").toggleClass('collapsed');

// Switch the class of the arrow from open to closed.
$(this).find('.expand-job-timeline-arrow').toggleClass('arrow-open');
$(this).find('.expand-job-timeline-arrow').toggleClass('arrow-closed');

window.localStorage.setItem("expand-job-timeline", "" + status);
});
}

$(function (){
if (window.localStorage.getItem("expand-job-timeline") == "true") {
// Set it to false so that the click function can revert it
window.localStorage.setItem("expand-job-timeline", "false");
$("span.expand-job-timeline").trigger('click');
}
});

function drawTaskAssignmentTimeline(groupArray, eventObjArray, minLaunchTime, maxFinishTime) {
var groups = new vis.DataSet(groupArray);
var items = new vis.DataSet(eventObjArray);
Expand Down Expand Up @@ -176,14 +202,27 @@ function drawTaskAssignmentTimeline(groupArray, eventObjArray, minLaunchTime, ma
setupZoomable("#task-assignment-timeline-zoom-lock", taskTimeline);

$("span.expand-task-assignment-timeline").click(function() {
var status = window.localStorage.getItem("expand-task-assignment-timeline") == "true";
status = !status;

$("#task-assignment-timeline").toggleClass("collapsed");

// Switch the class of the arrow from open to closed.
$(this).find(".expand-task-assignment-timeline-arrow").toggleClass("arrow-open");
$(this).find(".expand-task-assignment-timeline-arrow").toggleClass("arrow-closed");

window.localStorage.setItem("expand-task-assignment-timeline", "" + status);
});
}

$(function (){
if (window.localStorage.getItem("expand-task-assignment-timeline") == "true") {
// Set it to false so that the click function can revert it
window.localStorage.setItem("expand-task-assignment-timeline", "false");
$("span.expand-task-assignment-timeline").trigger('click');
}
});

function setupExecutorEventAction() {
$(".item.box.executor").each(function () {
$(this).hover(
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.Serializable

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.metrics.source.Source
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util.TaskCompletionListener

Expand Down Expand Up @@ -148,6 +149,14 @@ abstract class TaskContext extends Serializable {
@DeveloperApi
def taskMetrics(): TaskMetrics

/**
* ::DeveloperApi::
* Returns all metrics sources with the given name which are associated with the instance
* which runs the task. For more information see [[org.apache.spark.metrics.MetricsSystem!]].
*/
@DeveloperApi
def getMetricsSources(sourceName: String): Seq[Source]

/**
* Returns the manager for this task's managed memory.
*/
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.Source
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util.{TaskCompletionListener, TaskCompletionListenerException}

Expand All @@ -29,6 +31,7 @@ private[spark] class TaskContextImpl(
override val taskAttemptId: Long,
override val attemptNumber: Int,
override val taskMemoryManager: TaskMemoryManager,
@transient private val metricsSystem: MetricsSystem,
val runningLocally: Boolean = false,
val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends TaskContext
Expand Down Expand Up @@ -95,6 +98,9 @@ private[spark] class TaskContextImpl(

override def isInterrupted(): Boolean = interrupted

override def getMetricsSources(sourceName: String): Seq[Source] =
metricsSystem.getSourcesByName(sourceName)

@transient private val accumulators = new HashMap[Long, Accumulable[_, _]]

private[spark] override def registerAccumulator(a: Accumulable[_, _]): Unit = synchronized {
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,10 @@ private[spark] class Executor(
// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
val (value, accumUpdates) = try {
task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
} finally {
// Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread;
// when changing this, make sure to update both copies.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ private[spark] class MetricsSystem private (
} else { defaultName }
}

def getSourcesByName(sourceName: String): Seq[Source] =
sources.filter(_.sourceName == sourceName)

def registerSource(source: Source) {
sources += source
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ class DAGScheduler(
taskAttemptId = 0,
attemptNumber = 0,
taskMemoryManager = taskMemoryManager,
metricsSystem = env.metricsSystem,
runningLocally = true)
TaskContext.setTaskContext(taskContext)
try {
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio.ByteBuffer

import scala.collection.mutable.HashMap

import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.{TaskContextImpl, TaskContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.serializer.SerializerInstance
Expand Down Expand Up @@ -61,13 +62,18 @@ private[spark] abstract class Task[T](
* @param attemptNumber how many times this task has been attempted (0 for the first attempt)
* @return the result of the task along with updates of Accumulators.
*/
final def run(taskAttemptId: Long, attemptNumber: Int): (T, AccumulatorUpdates) = {
final def run(
taskAttemptId: Long,
attemptNumber: Int,
metricsSystem: MetricsSystem)
: (T, AccumulatorUpdates) = {
context = new TaskContextImpl(
stageId = stageId,
partitionId = partitionId,
taskAttemptId = taskAttemptId,
attemptNumber = attemptNumber,
taskMemoryManager = taskMemoryManager,
metricsSystem = metricsSystem,
runningLocally = false)
TaskContext.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

// Make fake resource offers on all executors
private def makeOffers() {
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq))
}.toSeq
launchTasks(scheduler.resourceOffers(workOffers))
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand All @@ -181,9 +184,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

// Make fake resource offers on just one executor
private def makeOffers(executorId: String) {
val executorData = executorDataMap(executorId)
launchTasks(scheduler.resourceOffers(
Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))))
// Filter out executors under killing
if (!executorsPendingToRemove.contains(executorId)) {
val executorData = executorDataMap(executorId)
val workOffers = Seq(
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
launchTasks(scheduler.resourceOffers(workOffers))
}
}

// Launch tasks returned by a set of resource offers
Expand Down
Loading

0 comments on commit 68f5cd9

Please sign in to comment.