Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into profiler
Browse files Browse the repository at this point in the history
Conflicts:
	python/pyspark/worker.py
  • Loading branch information
davies committed Sep 23, 2014
2 parents 116d52a + 729952a commit fb9565b
Show file tree
Hide file tree
Showing 144 changed files with 3,923 additions and 2,383 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
*~
*.#*
*#*#
*.swp
*.ipr
*.iml
*.iws
.idea/
.idea_modules/
sbt/*.jar
.settings
.cache
Expand All @@ -16,6 +19,7 @@ third_party/libmesos.so
third_party/libmesos.dylib
conf/java-opts
conf/*.sh
conf/*.cmd
conf/*.properties
conf/*.conf
conf/*.xml
Expand Down
2 changes: 2 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ log4j.properties.template
metrics.properties.template
slaves
spark-env.sh
spark-env.cmd
spark-env.sh.template
log4j-defaults.properties
bootstrap-tooltip.js
Expand Down Expand Up @@ -58,3 +59,4 @@ dist/*
.*iws
logs
.*scalastyle-output.xml
.*dependency-reduced-pom.xml
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ submitting any copyrighted material via pull request, email, or other means
you agree to license the material under the project's open source license and
warrant that you have the legal authority to do so.

Please see [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
Please see the [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
for more information.
4 changes: 3 additions & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@
<include>com.google.common.**</include>
</includes>
<excludes>
<exclude>com.google.common.base.Optional**</exclude>
<exclude>com/google/common/base/Absent*</exclude>
<exclude>com/google/common/base/Optional*</exclude>
<exclude>com/google/common/base/Present*</exclude>
</excludes>
</relocation>
</relocations>
Expand Down
2 changes: 1 addition & 1 deletion bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
set -o posix

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
CLASS_NOT_FOUND_EXIT_STATUS=1
CLASS_NOT_FOUND_EXIT_STATUS=101

# Figure out where Spark is installed
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
Expand Down
2 changes: 2 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,9 @@
<filter>
<artifact>com.google.guava:guava</artifact>
<includes>
<include>com/google/common/base/Absent*</include>
<include>com/google/common/base/Optional*</include>
<include>com/google/common/base/Present*</include>
</includes>
</filter>
</filters>
Expand Down
32 changes: 22 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1030,28 +1030,40 @@ class SparkContext(config: SparkConf) extends Logging {
}

/**
* Support function for API backtraces.
* Set the thread-local property for overriding the call sites
* of actions and RDDs.
*/
def setCallSite(site: String) {
setLocalProperty("externalCallSite", site)
def setCallSite(shortCallSite: String) {
setLocalProperty(CallSite.SHORT_FORM, shortCallSite)
}

/**
* Support function for API backtraces.
* Set the thread-local property for overriding the call sites
* of actions and RDDs.
*/
private[spark] def setCallSite(callSite: CallSite) {
setLocalProperty(CallSite.SHORT_FORM, callSite.shortForm)
setLocalProperty(CallSite.LONG_FORM, callSite.longForm)
}

/**
* Clear the thread-local property for overriding the call sites
* of actions and RDDs.
*/
def clearCallSite() {
setLocalProperty("externalCallSite", null)
setLocalProperty(CallSite.SHORT_FORM, null)
setLocalProperty(CallSite.LONG_FORM, null)
}

/**
* Capture the current user callsite and return a formatted version for printing. If the user
* has overridden the call site, this will return the user's version.
* has overridden the call site using `setCallSite()`, this will return the user's version.
*/
private[spark] def getCallSite(): CallSite = {
Option(getLocalProperty("externalCallSite")) match {
case Some(callSite) => CallSite(callSite, longForm = "")
case None => Utils.getCallSite
}
Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite =>
val longCallSite = Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("")
CallSite(shortCallSite, longCallSite)
}.getOrElse(Utils.getCallSite())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}
}
} else {
logWarning ("No need to commit output of task: " + taID.value)
logInfo ("No need to commit output of task: " + taID.value)
}
}

Expand Down
31 changes: 25 additions & 6 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -775,17 +775,36 @@ private[spark] object PythonRDD extends Logging {
}.toJavaRDD()
}

private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
private val pickle = new Pickler()
private var batch = 1
private val buffer = new mutable.ArrayBuffer[Any]

override def hasNext(): Boolean = iter.hasNext

override def next(): Array[Byte] = {
while (iter.hasNext && buffer.length < batch) {
buffer += iter.next()
}
val bytes = pickle.dumps(buffer.toArray)
val size = bytes.length
// let 1M < size < 10M
if (size < 1024 * 1024) {
batch *= 2
} else if (size > 1024 * 1024 * 10 && batch > 1) {
batch /= 2
}
buffer.clear()
bytes
}
}

/**
* Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by
* PySpark.
*/
def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
jRDD.rdd.mapPartitions { iter =>
val pickle = new Pickler
iter.map { row =>
pickle.dumps(row)
}
}
jRDD.rdd.mapPartitions { iter => new AutoBatchedPickler(iter) }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ private[python] object SerDeUtil extends Logging {
construct(args ++ Array(""))
} else if (args.length == 2 && args(1).isInstanceOf[String]) {
val typecode = args(0).asInstanceOf[String].charAt(0)
val data: String = args(1).asInstanceOf[String]
construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1"))
val data: Array[Byte] = args(1).asInstanceOf[String].getBytes("ISO-8859-1")
construct(typecode, machineCodes(typecode), data)
} else {
super.construct(args)
}
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object SparkSubmit {
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"

private val CLASS_NOT_FOUND_EXIT_STATUS = 1
private val CLASS_NOT_FOUND_EXIT_STATUS = 101

// Exposed for testing
private[spark] var exitFn: () => Unit = () => System.exit(-1)
Expand Down Expand Up @@ -172,7 +172,7 @@ object SparkSubmit {
// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
Expand All @@ -183,6 +183,7 @@ object SparkSubmit {
sysProp = "spark.driver.extraLibraryPath"),

// Standalone cluster only
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),

Expand Down Expand Up @@ -261,7 +262,7 @@ object SparkSubmit {
}

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (clusterManager == YARN && deployMode == CLUSTER) {
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.primaryResource != SPARK_INTERNAL) {
childArgs += ("--jar", args.primaryResource)
Expand All @@ -279,7 +280,7 @@ object SparkSubmit {
}

// Read from default spark properties, if any
for ((k, v) <- args.getDefaultSparkProperties) {
for ((k, v) <- args.defaultSparkProperties) {
sysProps.getOrElseUpdate(k, v)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
var pyFiles: String = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()

parseOpts(args.toList)
mergeSparkProperties()
checkRequiredArguments()

/** Return default present in the currently defined defaults file. */
def getDefaultSparkProperties = {
/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
Option(propertiesFile).foreach { filename =>
Expand All @@ -79,6 +75,14 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
defaultProperties
}

// Respect SPARK_*_MEMORY for cluster mode
driverMemory = sys.env.get("SPARK_DRIVER_MEMORY").orNull
executorMemory = sys.env.get("SPARK_EXECUTOR_MEMORY").orNull

parseOpts(args.toList)
mergeSparkProperties()
checkRequiredArguments()

/**
* Fill in any undefined values based on the default properties file or options passed in through
* the '--conf' flag.
Expand Down Expand Up @@ -107,7 +111,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
}
}

val properties = getDefaultSparkProperties
val properties = HashMap[String, String]()
properties.putAll(defaultSparkProperties)
properties.putAll(sparkProperties)

// Use properties file as fallback for values which have a direct analog to
Expand Down Expand Up @@ -213,7 +218,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
| verbose $verbose
|
|Default properties from $propertiesFile:
|${getDefaultSparkProperties.mkString(" ", "\n ", "\n")}
|${defaultSparkProperties.mkString(" ", "\n ", "\n")}
""".stripMargin
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
}

private val appHeader = Seq(
"App ID",
"App Name",
"Started",
"Completed",
Expand All @@ -81,7 +82,8 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
val duration = UIUtils.formatDuration(info.endTime - info.startTime)
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{info.name}</a></td>
<td><a href={uiAddress}>{info.id}</a></td>
<td>{info.name}</td>
<td>{startTime}</td>
<td>{endTime}</td>
<td>{duration}</td>
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -489,23 +489,24 @@ private[spark] class Master(
// First schedule drivers, they take strict precedence over applications
// Randomization helps balance drivers
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val aliveWorkerNum = shuffledAliveWorkers.size
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0

for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
curPos = (curPos + 1) % aliveWorkerNum
val startPos = curPos
var launched = false
while (curPos != startPos && !launched) {
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % aliveWorkerNum
curPos = (curPos + 1) % numWorkersAlive
}
}

Expand Down
16 changes: 11 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent._

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -375,12 +376,17 @@ private[spark] class Executor(
}

val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
retryAttempts, retryIntervalMs, timeout)
if (response.reregisterBlockManager) {
logWarning("Told to re-register on heartbeat")
env.blockManager.reregister()
try {
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
retryAttempts, retryIntervalMs, timeout)
if (response.reregisterBlockManager) {
logWarning("Told to re-register on heartbeat")
env.blockManager.reregister()
}
} catch {
case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t)
}

Thread.sleep(interval)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ class TaskMetrics extends Serializable {
merged.localBlocksFetched += depMetrics.localBlocksFetched
merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched
merged.remoteBytesRead += depMetrics.remoteBytesRead
merged.shuffleFinishTime = math.max(merged.shuffleFinishTime, depMetrics.shuffleFinishTime)
}
_shuffleReadMetrics = Some(merged)
}
Expand Down Expand Up @@ -177,11 +176,6 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
*/
@DeveloperApi
class ShuffleReadMetrics extends Serializable {
/**
* Absolute time when this task finished reading shuffle data
*/
var shuffleFinishTime: Long = -1

/**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ private[spark] class ReceivingConnection(
if (currId != null) currId else super.getRemoteConnectionManagerId()
}

// The reciever's remote address is the local socket on remote side : which is NOT
// The receiver's remote address is the local socket on remote side : which is NOT
// the connection manager id of the receiver.
// We infer that from the messages we receive on the receiver socket.
private def processConnectionManagerId(header: MessageChunkHeader) {
Expand Down
Loading

0 comments on commit fb9565b

Please sign in to comment.