Skip to content

Commit

Permalink
[SPARK-25422][CORE] Don't memory map blocks streamed to disk.
Browse files Browse the repository at this point in the history
After data has been streamed to disk, the buffers are inserted into the
memory store in some cases (eg., with broadcast blocks).  But broadcast
code also disposes of those buffers when the data has been read, to
ensure that we don't leave mapped buffers using up memory, which then
leads to garbage data in the memory store.
  • Loading branch information
squito committed Sep 21, 2018
1 parent 67f2cb6 commit aee82ab
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 29 deletions.
13 changes: 5 additions & 8 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Expand Up @@ -438,10 +438,8 @@ private[spark] class BlockManager(
// stream.
channel.close()
// TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up
// using a lot of memory here. With encryption, we'll read the whole file into a regular
// byte buffer and OOM. Without encryption, we'll memory map the file and won't get a jvm
// OOM, but might get killed by the OS / cluster manager. We could at least read the tmp
// file as a stream in both cases.
// using a lot of memory here. We'll read the whole file into a regular
// byte buffer and OOM. We could at least read the tmp file as a stream.
val buffer = securityManager.getIOEncryptionKey() match {
case Some(key) =>
// we need to pass in the size of the unencrypted block
Expand All @@ -453,7 +451,7 @@ private[spark] class BlockManager(
new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator)

case None =>
ChunkedByteBuffer.map(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
ChunkedByteBuffer.fromFile(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
}
putBytes(blockId, buffer, level)(classTag)
tmpFile.delete()
Expand Down Expand Up @@ -726,10 +724,9 @@ private[spark] class BlockManager(
*/
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
// TODO if we change this method to return the ManagedBuffer, then getRemoteValues
// could just use the inputStream on the temp file, rather than memory-mapping the file.
// could just use the inputStream on the temp file, rather than reading the file into memory.
// Until then, replication can cause the process to use too much memory and get killed
// by the OS / cluster manager (not a java OOM, since it's a memory-mapped file) even though
// we've read the data to disk.
// even though we've read the data to disk.
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
var runningFailureCount = 0
Expand Down
Expand Up @@ -19,17 +19,16 @@ package org.apache.spark.util.io

import java.io.{File, FileInputStream, InputStream}
import java.nio.ByteBuffer
import java.nio.channels.{FileChannel, WritableByteChannel}
import java.nio.file.StandardOpenOption

import scala.collection.mutable.ListBuffer
import java.nio.channels.WritableByteChannel

import com.google.common.io.ByteStreams
import com.google.common.primitives.UnsignedBytes
import org.apache.commons.io.IOUtils

import org.apache.spark.SparkEnv
import org.apache.spark.internal.config
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.util.ByteArrayWritableChannel
import org.apache.spark.network.util.{ByteArrayWritableChannel, LimitedInputStream}
import org.apache.spark.storage.StorageUtils
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -175,30 +174,32 @@ object ChunkedByteBuffer {
def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = {
data match {
case f: FileSegmentManagedBuffer =>
map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
fromFile(f.getFile, maxChunkSize, f.getOffset, f.getLength)
case other =>
new ChunkedByteBuffer(other.nioByteBuffer())
}
}

def map(file: File, maxChunkSize: Int): ChunkedByteBuffer = {
map(file, maxChunkSize, 0, file.length())
def fromFile(file: File, maxChunkSize: Int): ChunkedByteBuffer = {
fromFile(file, maxChunkSize, 0, file.length())
}

def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = {
Utils.tryWithResource(FileChannel.open(file.toPath, StandardOpenOption.READ)) { channel =>
var remaining = length
var pos = offset
val chunks = new ListBuffer[ByteBuffer]()
while (remaining > 0) {
val chunkSize = math.min(remaining, maxChunkSize)
val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, chunkSize)
pos += chunkSize
remaining -= chunkSize
chunks += chunk
}
new ChunkedByteBuffer(chunks.toArray)
def fromFile(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = {
// We do *not* memory map the file, because we may end up putting this into the memory store,
// and spark currently is not expecting memory-mapped buffers in the memory store, it conflicts
// with other parts that manage the lifecyle of buffers and dispose them. See SPARK-25422.
val is = new FileInputStream(file)
ByteStreams.skipFully(is, offset)
val in = new LimitedInputStream(is, length)
val chunkSize = math.min(maxChunkSize, length).toInt
val out = new ChunkedByteBufferOutputStream(chunkSize, ByteBuffer.allocate _)
Utils.tryWithSafeFinally {
IOUtils.copy(in, out)
} {
in.close()
out.close()
}
out.toChunkedByteBuffer
}
}

Expand Down

0 comments on commit aee82ab

Please sign in to comment.