Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed May 29, 2018
1 parent 4373e27 commit a9cfe29
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ private[spark] class BlockManager(
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
// TODO if we change this method to return the ManagedBuffer, then getRemoteValues
// could just use the inputStream on the temp file, rather than memory-mapping the file.
// Until then, replication can go cause the process to use too much memory and get killed
// Until then, replication can cause the process to use too much memory and get killed
// by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though
// we've read the data to disk.
logDebug(s"Getting remote block $blockId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ private[io] class ChunkedByteBufferFileRegion(
private val chunks = chunkedByteBuffer.getChunks()
private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()}
private val size = cumLength.last
// Chunk size in bytes

protected def deallocate: Unit = {}

override def count(): Long = chunkedByteBuffer.size
override def count(): Long = size

// this is the "start position" of the overall Data in the backing file, not our current position
override def position(): Long = 0
Expand Down Expand Up @@ -73,7 +72,6 @@ private[io] class ChunkedByteBufferFileRegion(
var keepGoing = true
var written = 0L
var currentChunk = chunks(currentChunkIdx)
var originalLimit = currentChunk.limit()
while (keepGoing) {
while (currentChunk.hasRemaining && keepGoing) {
val ioSize = Math.min(currentChunk.remaining(), ioChunkSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar
SparkEnv.set(null)
}

private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = {
private def generateChunkedByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = {
val bytes = (0 until nChunks).map { chunkIdx =>
val bb = ByteBuffer.allocate(perChunk)
(0 until perChunk).foreach { idx =>
Expand All @@ -58,7 +58,7 @@ class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar

test("transferTo can stop and resume correctly") {
SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
val cbb = generateChunkByteBuffer(4, 10)
val cbb = generateChunkedByteBuffer(4, 10)
val fileRegion = cbb.toNetty

val targetChannel = new LimitedWritableByteChannel(40)
Expand Down Expand Up @@ -111,7 +111,7 @@ class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar
val chunkSize = 1e4.toInt
SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong)

val cbb = generateChunkByteBuffer(50, chunkSize)
val cbb = generateChunkedByteBuffer(50, chunkSize)
val fileRegion = cbb.toNetty
val transferLimit = 1e5.toInt
val targetChannel = new LimitedWritableByteChannel(transferLimit)
Expand All @@ -134,7 +134,6 @@ class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar
var pos = 0

override def write(src: ByteBuffer): Int = {
val origSrcPos = src.position()
val length = math.min(acceptNBytes, src.remaining())
src.get(bytes, 0, length)
acceptNBytes -= length
Expand Down

0 comments on commit a9cfe29

Please sign in to comment.