Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into slaves-scrip…
Browse files Browse the repository at this point in the history
…ts-modification
  • Loading branch information
sarutak committed Sep 22, 2014
2 parents 297e75d + fd0b32c commit 88e2f17
Show file tree
Hide file tree
Showing 89 changed files with 2,232 additions and 1,533 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
*~
*.#*
*#*#
*.swp
*.ipr
*.iml
*.iws
.idea/
.idea_modules/
sbt/*.jar
.settings
.cache
Expand All @@ -16,6 +19,7 @@ third_party/libmesos.so
third_party/libmesos.dylib
conf/java-opts
conf/*.sh
conf/*.cmd
conf/*.properties
conf/*.conf
conf/*.xml
Expand Down
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ metrics.properties.template
slaves
slaves.template
spark-env.sh
spark-env.cmd
spark-env.sh.template
log4j-defaults.properties
bootstrap-tooltip.js
Expand Down
2 changes: 1 addition & 1 deletion bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
set -o posix

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
CLASS_NOT_FOUND_EXIT_STATUS=1
CLASS_NOT_FOUND_EXIT_STATUS=101

# Figure out where Spark is installed
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}
}
} else {
logWarning ("No need to commit output of task: " + taID.value)
logInfo ("No need to commit output of task: " + taID.value)
}
}

Expand Down
31 changes: 25 additions & 6 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -775,17 +775,36 @@ private[spark] object PythonRDD extends Logging {
}.toJavaRDD()
}

private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
private val pickle = new Pickler()
private var batch = 1
private val buffer = new mutable.ArrayBuffer[Any]

override def hasNext(): Boolean = iter.hasNext

override def next(): Array[Byte] = {
while (iter.hasNext && buffer.length < batch) {
buffer += iter.next()
}
val bytes = pickle.dumps(buffer.toArray)
val size = bytes.length
// let 1M < size < 10M
if (size < 1024 * 1024) {
batch *= 2
} else if (size > 1024 * 1024 * 10 && batch > 1) {
batch /= 2
}
buffer.clear()
bytes
}
}

/**
* Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by
* PySpark.
*/
def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
jRDD.rdd.mapPartitions { iter =>
val pickle = new Pickler
iter.map { row =>
pickle.dumps(row)
}
}
jRDD.rdd.mapPartitions { iter => new AutoBatchedPickler(iter) }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ private[python] object SerDeUtil extends Logging {
construct(args ++ Array(""))
} else if (args.length == 2 && args(1).isInstanceOf[String]) {
val typecode = args(0).asInstanceOf[String].charAt(0)
val data: String = args(1).asInstanceOf[String]
construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1"))
val data: Array[Byte] = args(1).asInstanceOf[String].getBytes("ISO-8859-1")
construct(typecode, machineCodes(typecode), data)
} else {
super.construct(args)
}
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object SparkSubmit {
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"

private val CLASS_NOT_FOUND_EXIT_STATUS = 1
private val CLASS_NOT_FOUND_EXIT_STATUS = 101

// Exposed for testing
private[spark] var exitFn: () => Unit = () => System.exit(-1)
Expand Down Expand Up @@ -172,7 +172,7 @@ object SparkSubmit {
// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
Expand All @@ -183,6 +183,7 @@ object SparkSubmit {
sysProp = "spark.driver.extraLibraryPath"),

// Standalone cluster only
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),

Expand Down Expand Up @@ -261,7 +262,7 @@ object SparkSubmit {
}

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (clusterManager == YARN && deployMode == CLUSTER) {
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.primaryResource != SPARK_INTERNAL) {
childArgs += ("--jar", args.primaryResource)
Expand All @@ -279,7 +280,7 @@ object SparkSubmit {
}

// Read from default spark properties, if any
for ((k, v) <- args.getDefaultSparkProperties) {
for ((k, v) <- args.defaultSparkProperties) {
sysProps.getOrElseUpdate(k, v)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
var pyFiles: String = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()

parseOpts(args.toList)
mergeSparkProperties()
checkRequiredArguments()

/** Return default present in the currently defined defaults file. */
def getDefaultSparkProperties = {
/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
Option(propertiesFile).foreach { filename =>
Expand All @@ -79,6 +75,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
defaultProperties
}

parseOpts(args.toList)
mergeSparkProperties()
checkRequiredArguments()

/**
* Fill in any undefined values based on the default properties file or options passed in through
* the '--conf' flag.
Expand Down Expand Up @@ -107,7 +107,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
}
}

val properties = getDefaultSparkProperties
val properties = HashMap[String, String]()
properties.putAll(defaultSparkProperties)
properties.putAll(sparkProperties)

// Use properties file as fallback for values which have a direct analog to
Expand Down Expand Up @@ -213,7 +214,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
| verbose $verbose
|
|Default properties from $propertiesFile:
|${getDefaultSparkProperties.mkString(" ", "\n ", "\n")}
|${defaultSparkProperties.mkString(" ", "\n ", "\n")}
""".stripMargin
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ class TaskMetrics extends Serializable {
merged.localBlocksFetched += depMetrics.localBlocksFetched
merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched
merged.remoteBytesRead += depMetrics.remoteBytesRead
merged.shuffleFinishTime = math.max(merged.shuffleFinishTime, depMetrics.shuffleFinishTime)
}
_shuffleReadMetrics = Some(merged)
}
Expand Down Expand Up @@ -177,11 +176,6 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
*/
@DeveloperApi
class ShuffleReadMetrics extends Serializable {
/**
* Absolute time when this task finished reading shuffle data
*/
var shuffleFinishTime: Long = -1

/**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ private[spark] class ReceivingConnection(
if (currId != null) currId else super.getRemoteConnectionManagerId()
}

// The reciever's remote address is the local socket on remote side : which is NOT
// The receiver's remote address is the local socket on remote side : which is NOT
// the connection manager id of the receiver.
// We infer that from the messages we receive on the receiver socket.
private def processConnectionManagerId(header: MessageChunkHeader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ private[nio] class ConnectionManager(

def changeConnectionKeyInterest(connection: Connection, ops: Int) {
keyInterestChangeRequests += ((connection.key, ops))
// so that registerations happen !
// so that registrations happen !
wakeupSelector()
}

Expand Down Expand Up @@ -832,7 +832,7 @@ private[nio] class ConnectionManager(
}

/**
* Send a message and block until an acknowldgment is received or an error occurs.
* Send a message and block until an acknowledgment is received or an error occurs.
* @param connectionManagerId the message's destination
* @param message the message being sent
* @return a Future that either returns the acknowledgment message or captures an exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])

// Use configured output committer if already set
if (conf.getOutputCommitter == null) {
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
}

FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
}
val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
case Some(metrics) =>
" SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
" BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
" BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ private[spark] object JsonProtocol {
}

def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = {
("Shuffle Finish Time" -> shuffleReadMetrics.shuffleFinishTime) ~
("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~
("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
Expand Down Expand Up @@ -590,7 +589,6 @@ private[spark] object JsonProtocol {

def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
val metrics = new ShuffleReadMetrics
metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long]
metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
sysProps("spark.app.name") should be ("beauty")
sysProps("spark.shuffle.spill") should be ("false")
sysProps("SPARK_SUBMIT") should be ("true")
sysProps.keys should not contain ("spark.jars")
}

test("handles YARN client mode") {
Expand Down
Loading

0 comments on commit 88e2f17

Please sign in to comment.