Skip to content

Commit

Permalink
fix compile errors
Browse files Browse the repository at this point in the history
  • Loading branch information
ConeyLiu committed Jan 21, 2018
1 parent a2b9513 commit 9d7c52d
Showing 1 changed file with 91 additions and 74 deletions.
165 changes: 91 additions & 74 deletions core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
Expand Up @@ -56,12 +56,6 @@ private case class SerializedMemoryEntry[T](
def size: Long = buffer.size
}

private trait ValuesHolder[T] {
def storeValue(value: T): Unit
def esitimatedSize(roughly: Boolean): Long
def buildEntry(): MemoryEntry[T]
}

private[storage] trait BlockEvictionHandler {
/**
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
Expand Down Expand Up @@ -303,34 +297,7 @@ private[spark] class MemoryStore(
values: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {

val valuesHolder = new ValuesHolder[T] {
// Underlying vector for unrolling the block
var vector = new SizeTrackingVector[T]()(classTag)
var arrayValues: Array[T] = null
var preciseSize: Long = -1

override def storeValue(value: T): Unit = {
vector += value
}

override def esitimatedSize(roughly: Boolean): Long = {
if (!roughly) {
// We only need the more precise size after all values unrolled.
arrayValues = vector.toArray
vector = null
preciseSize = SizeEstimator.estimate(arrayValues)
preciseSize
} else {
// For performance, rough estimate
vector.estimateSize()
}
}

override def buildEntry(): MemoryEntry[T] = {
// We successfully unrolled the entirety of this block
DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
}
}
val valuesHolder = new DeserializedValuesHolder[T](classTag)

putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match {
case Right(storedSize) => Right(storedSize)
Expand Down Expand Up @@ -370,48 +337,21 @@ private[spark] class MemoryStore(

require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")

val valuesHolder = new ValuesHolder[T] {
val allocator = memoryMode match {
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}

// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
val redirectableStream = new RedirectableOutputStream
val chunkSize = if (initialMemoryThreshold > Int.MaxValue) {
logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " +
s"is too large to be set as chunk size. Chunk size has been capped to " +
s"${Utils.bytesToString(Int.MaxValue)}")
Int.MaxValue
} else {
initialMemoryThreshold.toInt
}

val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
redirectableStream.setOutputStream(bbos)
val serializationStream: SerializationStream = {
val autoPick = !blockId.isInstanceOf[StreamBlockId]
val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
}

override def storeValue(value: T): Unit = {
serializationStream.writeObject(value)(classTag)
}

override def esitimatedSize(roughly: Boolean): Long = {
if (!roughly) {
serializationStream.close()
}
bbos.size
}

override def buildEntry(): MemoryEntry[T] = {
SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
}
// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
val redirectableStream = new RedirectableOutputStream
val chunkSize = if (initialMemoryThreshold > Int.MaxValue) {
logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " +
s"is too large to be set as chunk size. Chunk size has been capped to " +
s"${Utils.bytesToString(Int.MaxValue)}")
Int.MaxValue
} else {
initialMemoryThreshold.toInt
}

val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag,
memoryMode, serializerManager)

putIterator(blockId, values, classTag, memoryMode, valuesHolder) match {
case Right(storedSize) => Right(storedSize)
case Left(unrollMemoryUsedByThisBlock) =>
Expand Down Expand Up @@ -703,6 +643,83 @@ private[spark] class MemoryStore(
}
}

private trait ValuesHolder[T] {
def storeValue(value: T): Unit
def esitimatedSize(roughly: Boolean): Long
def buildEntry(): MemoryEntry[T]
}

/**
* A holder for storing the deserialized values.
*/
private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] {
// Underlying vector for unrolling the block
var vector = new SizeTrackingVector[T]()(classTag)
var arrayValues: Array[T] = null
var preciseSize: Long = -1

override def storeValue(value: T): Unit = {
vector += value
}

override def esitimatedSize(roughly: Boolean): Long = {
if (!roughly) {
// We only need the more precise size after all values unrolled.
arrayValues = vector.toArray
vector = null
preciseSize = SizeEstimator.estimate(arrayValues)
preciseSize
} else {
// For performance, rough estimate
vector.estimateSize()
}
}

override def buildEntry(): MemoryEntry[T] = {
// We successfully unrolled the entirety of this block
DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
}
}

/**
* A holder for storing the serialized values.
*/
private class SerializedValuesHolder[T](
blockId: BlockId,
chunkSize: Int,
classTag: ClassTag[T],
memoryMode: MemoryMode,
serializerManager: SerializerManager) extends ValuesHolder[T] {
val allocator = memoryMode match {
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}

val redirectableStream = new RedirectableOutputStream
val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
redirectableStream.setOutputStream(bbos)
val serializationStream: SerializationStream = {
val autoPick = !blockId.isInstanceOf[StreamBlockId]
val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
}

override def storeValue(value: T): Unit = {
serializationStream.writeObject(value)(classTag)
}

override def esitimatedSize(roughly: Boolean): Long = {
if (!roughly) {
serializationStream.close()
}
bbos.size
}

override def buildEntry(): MemoryEntry[T] = {
SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
}
}

/**
* The result of a failed [[MemoryStore.putIteratorAsValues()]] call.
*
Expand Down

0 comments on commit 9d7c52d

Please sign in to comment.