Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into sql-local-tests…
Browse files Browse the repository at this point in the history
…-cleanup

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
	sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
  • Loading branch information
Andrew Or committed Sep 15, 2015
2 parents a93a260 + 09b7e7c commit 0030ba0
Show file tree
Hide file tree
Showing 85 changed files with 1,774 additions and 287 deletions.
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: SparkR
Type: Package
Title: R frontend for Spark
Version: 1.5.0
Version: 1.6.0
Date: 2013-09-09
Author: The Apache Software Foundation
Maintainer: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
Expand Down
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.5.0-SNAPSHOT</version>
<version>1.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.5.0-SNAPSHOT</version>
<version>1.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.5.0-SNAPSHOT</version>
<version>1.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
27 changes: 27 additions & 0 deletions core/src/main/scala/org/apache/spark/MapOutputStatistics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

/**
* Holds statistics about the output sizes in a map stage. May become a DeveloperApi in the future.
*
* @param shuffleId ID of the shuffle
* @param bytesByPartitionId approximate number of output bytes for each map output partition
* (may be inexact due to use of compressed map statuses)
*/
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
49 changes: 38 additions & 11 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.io._
import java.util.Arrays
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

Expand Down Expand Up @@ -132,13 +133,43 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
* describing the shuffle blocks that are stored at that block manager.
*/
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, reduce $reduceId")
val startTime = System.currentTimeMillis
val statuses = getStatuses(shuffleId)
// Synchronize on the returned array because, on the driver, it gets mutated in place
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
}
}

/**
* Return statistics about all of the outputs for a given shuffle.
*/
def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
val statuses = getStatuses(dep.shuffleId)
// Synchronize on the returned array because, on the driver, it gets mutated in place
statuses.synchronized {
val totalSizes = new Array[Long](dep.partitioner.numPartitions)
for (s <- statuses) {
for (i <- 0 until totalSizes.length) {
totalSizes(i) += s.getSizeForBlock(i)
}
}
new MapOutputStatistics(dep.shuffleId, totalSizes)
}
}

/**
* Get or fetch the array of MapStatuses for a given shuffle ID. NOTE: clients MUST synchronize
* on this array when reading it, because on the driver, we may be changing it in place.
*
* (It would be nice to remove this restriction in the future.)
*/
private def getStatuses(shuffleId: Int): Array[MapStatus] = {
val statuses = mapStatuses.get(shuffleId).orNull
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
val startTime = System.currentTimeMillis
var fetchedStatuses: Array[MapStatus] = null
fetching.synchronized {
// Someone else is fetching it; wait for them to be done
Expand All @@ -160,7 +191,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}

if (fetchedStatuses == null) {
// We won the race to fetch the output locs; do so
// We won the race to fetch the statuses; do so
logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
// This try-finally prevents hangs due to timeouts:
try {
Expand All @@ -175,22 +206,18 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}
}
}
logDebug(s"Fetching map output location for shuffle $shuffleId, reduce $reduceId took " +
logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
s"${System.currentTimeMillis - startTime} ms")

if (fetchedStatuses != null) {
fetchedStatuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
return fetchedStatuses
} else {
logError("Missing all output locations for shuffle " + shuffleId)
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
shuffleId, -1, "Missing all output locations for shuffle " + shuffleId)
}
} else {
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
}
return statuses
}
}

Expand Down
17 changes: 17 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1984,6 +1984,23 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
new SimpleFutureAction(waiter, resultFunc)
}

/**
* Submit a map stage for execution. This is currently an internal API only, but might be
* promoted to DeveloperApi in the future.
*/
private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C])
: SimpleFutureAction[MapOutputStatistics] = {
assertNotStopped()
val callSite = getCallSite()
var result: MapOutputStatistics = null
val waiter = dagScheduler.submitMapStage(
dependency,
(r: MapOutputStatistics) => { result = r },
callSite,
localProperties.get)
new SimpleFutureAction[MapOutputStatistics](waiter, result)
}

/**
* Cancel active jobs for the specified group. See [[org.apache.spark.SparkContext.setJobGroup]]
* for more information.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ package org.apache

package object spark {
// For package docs only
val SPARK_VERSION = "1.5.0-SNAPSHOT"
val SPARK_VERSION = "1.6.0-SNAPSHOT"
}
34 changes: 29 additions & 5 deletions core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,42 @@ import org.apache.spark.TaskContext
import org.apache.spark.util.CallSite

/**
* Tracks information about an active job in the DAGScheduler.
* A running job in the DAGScheduler. Jobs can be of two types: a result job, which computes a
* ResultStage to execute an action, or a map-stage job, which computes the map outputs for a
* ShuffleMapStage before any downstream stages are submitted. The latter is used for adaptive
* query planning, to look at map output statistics before submitting later stages. We distinguish
* between these two types of jobs using the finalStage field of this class.
*
* Jobs are only tracked for "leaf" stages that clients directly submitted, through DAGScheduler's
* submitJob or submitMapStage methods. However, either type of job may cause the execution of
* other earlier stages (for RDDs in the DAG it depends on), and multiple jobs may share some of
* these previous stages. These dependencies are managed inside DAGScheduler.
*
* @param jobId A unique ID for this job.
* @param finalStage The stage that this job computes (either a ResultStage for an action or a
* ShuffleMapStage for submitMapStage).
* @param callSite Where this job was initiated in the user's program (shown on UI).
* @param listener A listener to notify if tasks in this job finish or the job fails.
* @param properties Scheduling properties attached to the job, such as fair scheduler pool name.
*/
private[spark] class ActiveJob(
val jobId: Int,
val finalStage: ResultStage,
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
val finalStage: Stage,
val callSite: CallSite,
val listener: JobListener,
val properties: Properties) {

val numPartitions = partitions.length
/**
* Number of partitions we need to compute for this job. Note that result stages may not need
* to compute all partitions in their target RDD, for actions like first() and lookup().
*/
val numPartitions = finalStage match {
case r: ResultStage => r.partitions.length
case m: ShuffleMapStage => m.rdd.partitions.length
}

/** Which partitions of the stage have finished */
val finished = Array.fill[Boolean](numPartitions)(false)

var numFinished = 0
}
Loading

0 comments on commit 0030ba0

Please sign in to comment.