Permalink
Browse files

Merge ../incubator-spark into iterator-to-disk

  • Loading branch information...
2 parents 46dff34 + 33ac390 commit 8aa31cdf94981887fcbc5c7db79a4f2d310dcb59 @kellrott kellrott committed Feb 28, 2014
@@ -71,10 +71,21 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues }
- val elements = new ArrayBuffer[Any]
- elements ++= computedValues
- blockManager.put(key, elements, storageLevel, tellMaster = true)
- elements.iterator.asInstanceOf[Iterator[T]]
+ if (storageLevel.useDisk && !storageLevel.useMemory) {
+ blockManager.put(key, computedValues, storageLevel, tellMaster = true)
+ return blockManager.get(key) match {
+ case Some(values) =>
+ return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
+ case None =>
+ logInfo("Failure to store %s".format(key))
+ return null
+ }
+ } else {
+ val elements = new ArrayBuffer[Any]
+ elements ++= computedValues
+ blockManager.put(key, elements, storageLevel, tellMaster = true)
+ return elements.iterator.asInstanceOf[Iterator[T]]
+ }
} finally {
loading.synchronized {
loading.remove(key)
@@ -23,9 +23,27 @@ import java.nio.ByteBuffer
import org.apache.spark.SparkConf
import org.apache.spark.util.ByteBufferInputStream
-private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
+private[spark] class JavaSerializationStream(out: OutputStream,
+ conf: SparkConf) extends SerializationStream {
val objOut = new ObjectOutputStream(out)
- def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this }
+ var counter = 0
+ val counterReset = conf.getInt("spark.serializer.objectStreamReset", 10000)
+
+ /* Calling reset to avoid memory leak:
+ * http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
+ * But only call it every 1000th time to avoid bloated serialization streams (when
+ * the stream 'resets' object class descriptions have to be re-written)
+ */
+ def writeObject[T](t: T): SerializationStream = {
+ objOut.writeObject(t)
+ if (counterReset > 0 && counter >= counterReset) {
+ objOut.reset()
+ counter = 0
+ } else {
+ counter += 1
+ }
+ this
+ }
def flush() { objOut.flush() }
def close() { objOut.close() }
}
@@ -41,7 +59,7 @@ extends DeserializationStream {
def close() { objIn.close() }
}
-private[spark] class JavaSerializerInstance extends SerializerInstance {
+private[spark] class JavaSerializerInstance(conf: SparkConf) extends SerializerInstance {
def serialize[T](t: T): ByteBuffer = {
val bos = new ByteArrayOutputStream()
val out = serializeStream(bos)
@@ -63,7 +81,7 @@ private[spark] class JavaSerializerInstance extends SerializerInstance {
}
def serializeStream(s: OutputStream): SerializationStream = {
- new JavaSerializationStream(s)
+ new JavaSerializationStream(s, conf)
}
def deserializeStream(s: InputStream): DeserializationStream = {
@@ -79,5 +97,5 @@ private[spark] class JavaSerializerInstance extends SerializerInstance {
* A Spark serializer that uses Java's built-in serialization.
*/
class JavaSerializer(conf: SparkConf) extends Serializer {
- def newInstance(): SerializerInstance = new JavaSerializerInstance
+ def newInstance(): SerializerInstance = new JavaSerializerInstance(conf)
}
@@ -35,6 +35,12 @@ import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
import org.apache.spark.util._
+sealed trait Values
+
+case class ByteBufferValues(buffer: ByteBuffer) extends Values
+case class IteratorValues(iterator: Iterator[Any]) extends Values
+case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends Values
+
private[spark] class BlockManager(
executorId: String,
actorSystem: ActorSystem,
@@ -455,9 +461,7 @@ private[spark] class BlockManager(
def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
: Long = {
- val elements = new ArrayBuffer[Any]
- elements ++= values
- put(blockId, elements, level, tellMaster)
+ doPut(blockId, IteratorValues(values), level, tellMaster)
}
/**
@@ -479,7 +483,7 @@ private[spark] class BlockManager(
def put(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
tellMaster: Boolean = true) : Long = {
require(values != null, "Values is null")
- doPut(blockId, Left(values), level, tellMaster)
+ doPut(blockId, ArrayBufferValues(values), level, tellMaster)
}
/**
@@ -488,10 +492,11 @@ private[spark] class BlockManager(
def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel,
tellMaster: Boolean = true) {
require(bytes != null, "Bytes is null")
- doPut(blockId, Right(bytes), level, tellMaster)
+ doPut(blockId, ByteBufferValues(bytes), level, tellMaster)
}
- private def doPut(blockId: BlockId, data: Either[ArrayBuffer[Any], ByteBuffer],
+ private def doPut(blockId: BlockId,
+ data: Values,
level: StorageLevel, tellMaster: Boolean = true): Long = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
@@ -534,8 +539,9 @@ private[spark] class BlockManager(
// If we're storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
- val replicationFuture = if (data.isRight && level.replication > 1) {
- val bufferView = data.right.get.duplicate() // Doesn't copy the bytes, just creates a wrapper
+ val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) {
+ //Duplicate doesn't copy the bytes, just creates a wrapper
+ val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
Future {
replicate(blockId, bufferView, level)
}
@@ -549,34 +555,43 @@ private[spark] class BlockManager(
var marked = false
try {
- data match {
- case Left(values) => {
- if (level.useMemory) {
- // Save it just to memory first, even if it also has useDisk set to true; we will
- // drop it to disk later if the memory store can't hold it.
- val res = memoryStore.putValues(blockId, values, level, true)
- size = res.size
- res.data match {
- case Right(newBytes) => bytesAfterPut = newBytes
- case Left(newIterator) => valuesAfterPut = newIterator
- }
- } else {
- // Save directly to disk.
- // Don't get back the bytes unless we replicate them.
- val askForBytes = level.replication > 1
- val res = diskStore.putValues(blockId, values, level, askForBytes)
- size = res.size
- res.data match {
- case Right(newBytes) => bytesAfterPut = newBytes
- case _ =>
- }
+ if (level.useMemory) {
+ // Save it just to memory first, even if it also has useDisk set to true; we will
+ // drop it to disk later if the memory store can't hold it.
+ val res = data match {
+ case IteratorValues(values_i) =>
+ memoryStore.putValues(blockId, values_i, level, true)
+ case ArrayBufferValues(values_a) =>
+ memoryStore.putValues(blockId, values_a, level, true)
+ case ByteBufferValues(value_bytes) => {
+ value_bytes.rewind();
+ memoryStore.putBytes(blockId, value_bytes, level)
+ }
+ }
+ size = res.size
+ res.data match {
+ case Right(newBytes) => bytesAfterPut = newBytes
+ case Left(newIterator) => valuesAfterPut = newIterator
+ }
+ } else {
+ // Save directly to disk.
+ // Don't get back the bytes unless we replicate them.
+ val askForBytes = level.replication > 1
+
+ val res = data match {
+ case IteratorValues(values_i) =>
+ diskStore.putValues(blockId, values_i, level, askForBytes)
+ case ArrayBufferValues(values_a) =>
+ diskStore.putValues(blockId, values_a, level, askForBytes)
+ case ByteBufferValues(value_bytes) => {
+ value_bytes.rewind();
+ diskStore.putBytes(blockId, value_bytes, level)
}
}
- case Right(bytes) => {
- bytes.rewind()
- // Store it only in memory at first, even if useDisk is also set to true
- (if (level.useMemory) memoryStore else diskStore).putBytes(blockId, bytes, level)
- size = bytes.limit
+ size = res.size
+ res.data match {
+ case Right(newBytes) => bytesAfterPut = newBytes
+ case _ =>
}
}
@@ -605,8 +620,8 @@ private[spark] class BlockManager(
// values and need to serialize and replicate them now:
if (level.replication > 1) {
data match {
- case Right(bytes) => Await.ready(replicationFuture, Duration.Inf)
- case Left(values) => {
+ case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf)
+ case _ => {
val remoteStartTime = System.currentTimeMillis
// Serialize the block if not already done
if (bytesAfterPut == null) {
@@ -28,7 +28,7 @@ import org.apache.spark.Logging
*/
private[spark]
abstract class BlockStore(val blockManager: BlockManager) extends Logging {
- def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel)
+ def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel) : PutResult
/**
* Put in a block and, possibly, also return its content as either bytes or another Iterator.
@@ -37,9 +37,12 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
* @return a PutResult that contains the size of the data, as well as the values put if
* returnValues is true (if not, the result's data field can be null)
*/
- def putValues(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
+ def putValues(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
returnValues: Boolean) : PutResult
+ def putValues(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
+ returnValues: Boolean) : PutResult
+
/**
* Return the size of a block in bytes.
*/
@@ -37,7 +37,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
diskManager.getBlockLocation(blockId).length
}
- override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) {
+ override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = {
// So that we do not modify the input offsets !
// duplicate does not copy buffer, so inexpensive
val bytes = _bytes.duplicate()
@@ -52,11 +52,21 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime)))
+ return PutResult(bytes.limit(), Right(bytes.duplicate()))
+ }
+
+ override def putValues(
+ blockId: BlockId,
+ values: ArrayBuffer[Any],
+ level: StorageLevel,
+ returnValues: Boolean)
+ : PutResult = {
+ return putValues(blockId, values.toIterator, level, returnValues)
}
override def putValues(
blockId: BlockId,
- values: ArrayBuffer[Any],
+ values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {
@@ -65,7 +75,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val outputStream = new FileOutputStream(file)
- blockManager.dataSerializeStream(blockId, outputStream, values.iterator)
+ blockManager.dataSerializeStream(blockId, outputStream, values)
val length = file.length
val timeTaken = System.currentTimeMillis - startTime
@@ -49,7 +49,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}
- override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) {
+ override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = {
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
@@ -59,24 +59,45 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
tryToPut(blockId, elements, sizeEstimate, true)
+ PutResult(sizeEstimate, Left(values.toIterator))
} else {
tryToPut(blockId, bytes, bytes.limit, false)
+ PutResult(bytes.limit(), Right(bytes.duplicate()))
+ }
+ }
+
+ override def putValues(
+ blockId: BlockId,
+ values: ArrayBuffer[Any],
+ level: StorageLevel,
+ returnValues: Boolean)
+ : PutResult = {
+ if (level.deserialized) {
+ val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
+ tryToPut(blockId, values, sizeEstimate, true)
+ PutResult(sizeEstimate, Left(values.toIterator))
+ } else {
+ val bytes = blockManager.dataSerialize(blockId, values.toIterator)
+ tryToPut(blockId, bytes, bytes.limit, false)
+ PutResult(bytes.limit(), Right(bytes.duplicate()))
}
}
override def putValues(
blockId: BlockId,
- values: ArrayBuffer[Any],
+ values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {
if (level.deserialized) {
- val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
- tryToPut(blockId, values, sizeEstimate, true)
- PutResult(sizeEstimate, Left(values.iterator))
+ val valueEntries = new ArrayBuffer[Any]()
+ valueEntries ++= values
+ val sizeEstimate = SizeEstimator.estimate(valueEntries.asInstanceOf[AnyRef])
+ tryToPut(blockId, valueEntries, sizeEstimate, true)
+ PutResult(sizeEstimate, Left(valueEntries.toIterator))
} else {
- val bytes = blockManager.dataSerialize(blockId, values.iterator)
+ val bytes = blockManager.dataSerialize(blockId, values)
tryToPut(blockId, bytes, bytes.limit, false)
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
Oops, something went wrong.

0 comments on commit 8aa31cd

Please sign in to comment.