Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-4194
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/SparkContext.scala
	core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
  • Loading branch information
Marcelo Vanzin committed Apr 3, 2015
2 parents 8caa8b3 + 512a2f1 commit 3979aad
Show file tree
Hide file tree
Showing 85 changed files with 833 additions and 279 deletions.
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,14 @@ private[spark] object MapOutputTracker extends Logging {
def serializeMapStatuses(statuses: Array[MapStatus]): Array[Byte] = {
val out = new ByteArrayOutputStream
val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
// Since statuses can be modified in parallel, sync on it
statuses.synchronized {
objOut.writeObject(statuses)
Utils.tryWithSafeFinally {
// Since statuses can be modified in parallel, sync on it
statuses.synchronized {
objOut.writeObject(statuses)
}
} {
objOut.close()
}
objOut.close()
out.toByteArray
}

Expand Down
19 changes: 16 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
override protected def initialValue(): Properties = new Properties()
}

/* ------------------------------------------------------------------------------------- *
Expand Down Expand Up @@ -563,9 +564,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Spark fair scheduler pool.
*/
def setLocalProperty(key: String, value: String) {
if (localProperties.get() == null) {
localProperties.set(new Properties())
}
if (value == null) {
localProperties.get.remove(key)
} else {
Expand Down Expand Up @@ -1474,6 +1472,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
if (!stopped) {
stopped = true
postApplicationEnd()
<<<<<<< HEAD
_ui.foreach(_.stop())
if (env != null) {
env.metricsSystem.report()
Expand All @@ -1497,6 +1496,20 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
_progressBar.foreach(_.stop())
_taskScheduler = null
=======
ui.foreach(_.stop())
env.metricsSystem.report()
metadataCleaner.cancel()
cleaner.foreach(_.stop())
executorAllocationManager.foreach(_.stop())
dagScheduler.stop()
dagScheduler = null
listenerBus.stop()
eventLogger.foreach(_.stop())
env.actorSystem.stop(heartbeatReceiver)
progressBar.foreach(_.stop())
taskScheduler = null
>>>>>>> master
// TODO: Cache.stop()?
if (_env != null) {
_env.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*/
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.call(x).asScala
def fn: (V) => Iterable[U] = (x: V) => f.call(x).asScala
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.flatMapValues(fn))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
*/
def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.call(x)
def fn: (T) => S = (x: T) => f.call(x)
import com.google.common.collect.Ordering // shadows scala.math.Ordering
implicit val ordering = Ordering.natural().asInstanceOf[Ordering[S]]
implicit val ctag: ClassTag[S] = fakeClassTag
Expand Down
53 changes: 34 additions & 19 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.api.java

import java.util.{Comparator, List => JList, Iterator => JIterator}
import java.{lang => jl}
import java.lang.{Iterable => JIterable, Long => JLong}
import java.util.{Comparator, List => JList, Iterator => JIterator}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -93,7 +94,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* of the original partition.
*/
def mapPartitionsWithIndex[R](
f: JFunction2[java.lang.Integer, java.util.Iterator[T], java.util.Iterator[R]],
f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] =
new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
preservesPartitioning)(fakeClassTag))(fakeClassTag)
Expand All @@ -109,7 +110,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to all elements of this RDD.
*/
def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
def cm = implicitly[ClassTag[(K2, V2)]]
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}

Expand All @@ -119,7 +120,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.call(x).asScala
def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}

Expand All @@ -129,8 +130,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.call(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: java.lang.Double) => x.doubleValue()))
def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
}

/**
Expand All @@ -139,16 +140,18 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.call(x).asScala
def cm = implicitly[ClassTag[(K2, V2)]]
def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}

/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
def fn: (Iterator[T]) => Iterator[U] = {
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
}
JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}

Expand All @@ -157,7 +160,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
preservesPartitioning: Boolean): JavaRDD[U] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
def fn: (Iterator[T]) => Iterator[U] = {
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
}
JavaRDD.fromRDD(
rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
}
Expand All @@ -166,16 +171,20 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
def fn: (Iterator[T]) => Iterator[jl.Double] = {
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
}
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue()))
}

/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
}
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}

Expand All @@ -184,7 +193,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
preservesPartitioning: Boolean): JavaDoubleRDD = {
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
def fn: (Iterator[T]) => Iterator[jl.Double] = {
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
}
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
.map(x => x.doubleValue()))
}
Expand All @@ -194,7 +205,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
(x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
}
JavaPairRDD.fromRDD(
rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
}
Expand Down Expand Up @@ -277,8 +290,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def zipPartitions[U, V](
other: JavaRDDLike[U, _],
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
f.call(asJavaIterator(x), asJavaIterator(y)).iterator())
def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
(x: Iterator[T], y: Iterator[U]) => asScalaIterator(
f.call(asJavaIterator(x), asJavaIterator(y)).iterator())
}
JavaRDD.fromRDD(
rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
}
Expand Down Expand Up @@ -441,8 +456,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
* combine step happens locally on the master, equivalent to running a single reduce task.
*/
def countByValue(): java.util.Map[T, java.lang.Long] =
mapAsSerializableJavaMap(rdd.countByValue().map((x => (x._1, new java.lang.Long(x._2)))))
def countByValue(): java.util.Map[T, jl.Long] =
mapAsSerializableJavaMap(rdd.countByValue().map((x => (x._1, new jl.Long(x._2)))))

/**
* (Experimental) Approximate version of countByValue().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,9 +614,9 @@ private[spark] object PythonRDD extends Logging {
try {
val sock = serverSocket.accept()
val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream))
try {
Utils.tryWithSafeFinally {
writeIteratorToStream(items, out)
} finally {
} {
out.close()
}
} catch {
Expand Down Expand Up @@ -862,9 +862,9 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
val file = File.createTempFile("broadcast", "", dir)
path = file.getAbsolutePath
val out = new FileOutputStream(file)
try {
Utils.tryWithSafeFinally {
Utils.copyStream(in, out)
} finally {
} {
out.close()
}
}
Expand Down
19 changes: 12 additions & 7 deletions core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private[broadcast] object HttpBroadcast extends Logging {
private def write(id: Long, value: Any) {
val file = getFile(id)
val fileOutputStream = new FileOutputStream(file)
try {
Utils.tryWithSafeFinally {
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(fileOutputStream)
Expand All @@ -175,10 +175,13 @@ private[broadcast] object HttpBroadcast extends Logging {
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
Utils.tryWithSafeFinally {
serOut.writeObject(value)
} {
serOut.close()
}
files += file
} finally {
} {
fileOutputStream.close()
}
}
Expand Down Expand Up @@ -212,9 +215,11 @@ private[broadcast] object HttpBroadcast extends Logging {
}
val ser = SparkEnv.get.serializer.newInstance()
val serIn = ser.deserializeStream(in)
val obj = serIn.readObject[T]()
serIn.close()
obj
Utils.tryWithSafeFinally {
serIn.readObject[T]()
} {
serIn.close()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
if (!fs.exists(path)) {
var msg = s"Log directory specified does not exist: $logDir."
if (logDir == DEFAULT_LOG_DIR) {
msg += " Did you configure the correct one through spark.fs.history.logDirectory?"
msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
}
throw new IllegalArgumentException(msg)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.reflect.ClassTag
import akka.serialization.Serialization

import org.apache.spark.Logging
import org.apache.spark.util.Utils


/**
Expand Down Expand Up @@ -59,9 +60,9 @@ private[master] class FileSystemPersistenceEngine(
val serializer = serialization.findSerializerFor(value)
val serialized = serializer.toBinary(value)
val out = new FileOutputStream(file)
try {
Utils.tryWithSafeFinally {
out.write(serialized)
} finally {
} {
out.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.JsonProcessingException
import com.google.common.base.Charsets

import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
import org.apache.spark.util.Utils

/**
* A client that submits applications to the standalone Master using a REST protocol.
Expand Down Expand Up @@ -148,8 +149,11 @@ private[deploy] class StandaloneRestClient extends Logging {
conn.setRequestProperty("charset", "utf-8")
conn.setDoOutput(true)
val out = new DataOutputStream(conn.getOutputStream)
out.write(json.getBytes(Charsets.UTF_8))
out.close()
Utils.tryWithSafeFinally {
out.write(json.getBytes(Charsets.UTF_8))
} {
out.close()
}
readResponse(conn)
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils

private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}

Expand Down Expand Up @@ -112,8 +113,11 @@ private[spark] object CheckpointRDD extends Logging {
}
val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
serializeStream.writeAll(iterator)
serializeStream.close()
Utils.tryWithSafeFinally {
serializeStream.writeAll(iterator)
} {
serializeStream.close()
}

if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.exists(finalOutputPath)) {
Expand Down
Loading

0 comments on commit 3979aad

Please sign in to comment.