Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24307][CORE] Support reading remote cached partitions > 2gb #21440

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,11 @@ private[spark] class BlockManager(
* Get block from remote block managers as serialized bytes.
*/
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
// TODO if we change this method to return the ManagedBuffer, then getRemoteValues
// could just use the inputStream on the temp file, rather than memory-mapping the file.
// Until then, replication can cause the process to use too much memory and get killed
// by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though
// we've read the data to disk.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw this fix is such low-hanging fruit that I would try to do this immediately afterwards. (I haven't filed a jira yet just because there are already so many defunct jira related to this, I was going to wait till my changes got some traction).

I think its OK to get it in like this first, as this makes the behavior for 2.01 gb basically the same as it was for 1.99 gb.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming this goes in shortly -- anybody interested in picking up this TODO? maybe @Ngone51 or @NiharS ?

Copy link
Contributor

@jerryshao jerryshao Jul 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a java OOM, since its a memory-mapped file

I'm not sure why memory-mapped file will cause too much memory? AFAIK memory mapping is a lazy loading mechanism in page-wise, system will only load the to-be-accessed file segment to memory page, not the whole file to memory. So from my understanding even very small physical memory could map a super large file. Memory mapping will not occupy too much memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be honest I don't have perfect understanding of this, but my impression is that it is not exactly lazy loading, the OS has a lot of leeway in deciding how much to keep in memory, but that it should always release the memory under pressure. this is problematic under yarn, when the container's memory use is being monitored independently of the OS. so the OS thinks its fine to put large amounts of data in physical memory, but then the yarn NM looks at the memory use of the specific process tree, decides its over the limits it has configured, and so kills it.

At least, I've seen cases of yarn killing things for exceeding memory limits where I thought that was the case, though I did not directly confirm it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I agree with you that YARN could have some issues in calculating the exact memory usage.

logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
var runningFailureCount = 0
Expand Down Expand Up @@ -689,7 +694,7 @@ private[spark] class BlockManager(
logDebug(s"Getting remote block $blockId from $loc")
val data = try {
blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager).nioByteBuffer()
loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager)
} catch {
case NonFatal(e) =>
runningFailureCount += 1
Expand Down Expand Up @@ -723,7 +728,9 @@ private[spark] class BlockManager(
}

if (data != null) {
return Some(new ChunkedByteBuffer(data))
val chunkSize =
conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Make chunkSize as a private field in BlockManager instead of recomputing it each time ?

return Some(ChunkedByteBuffer.fromManagedBuffer(data, chunkSize))
}
logDebug(s"The value of block $blockId is null")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@

package org.apache.spark.util.io

import java.io.InputStream
import java.io.{File, FileInputStream, InputStream}
import java.nio.ByteBuffer
import java.nio.channels.WritableByteChannel
import java.nio.channels.{FileChannel, WritableByteChannel}

import scala.collection.mutable.ListBuffer

import com.google.common.primitives.UnsignedBytes
import io.netty.buffer.{ByteBuf, Unpooled}

import org.apache.spark.SparkEnv
import org.apache.spark.internal.config
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.util.ByteArrayWritableChannel
import org.apache.spark.storage.StorageUtils
import org.apache.spark.util.Utils

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. This blank line seems not necessary.


/**
* Read-only byte buffer which is physically stored as multiple chunks rather than a single
Expand Down Expand Up @@ -81,10 +85,10 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
}

/**
* Wrap this buffer to view it as a Netty ByteBuf.
* Wrap this in a custom "FileRegion" which allows us to transfer over 2 GB.
*/
def toNetty: ByteBuf = {
Unpooled.wrappedBuffer(chunks.length, getChunks(): _*)
def toNetty: ChunkedByteBufferFileRegion = {
new ChunkedByteBufferFileRegion(this, bufferWriteChunkSize)
}

/**
Expand Down Expand Up @@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {

}

object ChunkedByteBuffer {
// TODO eliminate this method if we switch BlockManager to getting InputStreams
def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = {
data match {
case f: FileSegmentManagedBuffer =>
map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
case other =>
new ChunkedByteBuffer(other.nioByteBuffer())
}
}

def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = {
Utils.tryWithResource(new FileInputStream(file).getChannel()) { channel =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please use FileChannel#open instead, FileInputStream/FileOutputStream has some issues (https://www.cloudbees.com/blog/fileinputstream-fileoutputstream-considered-harmful)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't aware of that issue, thanks for sharing that, I'll update this. Should we also update other uses? Seems there are a lot of other cases, eg. UnsafeShuffleWriter, DiskBlockObjectWriter, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've already updated some of them in SPARK-21475 in shuffle related code path, but not all of them which are not so critical.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great, thanks for the explanation

var remaining = length
var pos = offset
val chunks = new ListBuffer[ByteBuffer]()
while (remaining > 0) {
val chunkSize = math.min(remaining, maxChunkSize)
val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, chunkSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we could make these FileRegion's instead : and use transferTo instead of write in ChunkedByteBufferFileRegion ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand. What FileRegion are you referring to -- the only one I know of is netty's interface. Do you mean implement another FileRegion for each chunk?, and then have ChunkedByteBufferFileRegion delegate to that?

We could do that, but I don't think it would be any better. ChunkedByteBufferFileRegion.transferTo would be about as complex as now. Also it may be worth noting that this particular method really should disappear -- we shouldn't be mapping this at all, we should be using an input stream (see the TODO above), but I want to do that separately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of DefaultFileRegion .. but any other zero copy impl should be fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your concern is that when we are going to send data that is backed by a file, eg. a remote read of an RDD cached on disk, we should be able to send it using something more efficient than memory mapping the entire file. Is that correct?

That actually isn't a problem. This map() method isn't called for sending disk-cached RDDs. That is already handled correctly with FileSegmentManagedBuffer.convertToNetty(), which uses the DefaultFileRegion you had in mind. The map method is only used on the receiving end, after the data has already been transferred, and just to pass the data on to other spark code locally in the executor. (And that will avoid the map() entirely after the TODO above.)

I needed to add ChunkedByteBufferFileRegion for data that is already in memory as a ChunkedByteBuffer, eg. for memory-cached RDDs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect, thanks for clarifying !

pos += chunkSize
remaining -= chunkSize
chunks += chunk
}
new ChunkedByteBuffer(chunks.toArray)
}
}
}

/**
* Reads data from a ChunkedByteBuffer.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util.io

import java.nio.channels.WritableByteChannel

import io.netty.channel.FileRegion
import io.netty.util.AbstractReferenceCounted

import org.apache.spark.internal.Logging
import org.apache.spark.network.util.AbstractFileRegion


/**
* This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty
* message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion,
* even though the data is not backed by a file.
*/
private[io] class ChunkedByteBufferFileRegion(
private val chunkedByteBuffer: ChunkedByteBuffer,
private val ioChunkSize: Int) extends AbstractFileRegion {

private var _transferred: Long = 0
// this duplicates the original chunks, so we're free to modify the position, limit, etc.
private val chunks = chunkedByteBuffer.getChunks()
private val size = chunks.foldLeft(0) { _ + _.remaining() }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0L? Otherwise this will overflow for > 2G right?


protected def deallocate: Unit = {}

override def count(): Long = size

// this is the "start position" of the overall Data in the backing file, not our current position
override def position(): Long = 0

override def transferred(): Long = _transferred

private var currentChunkIdx = 0

def transferTo(target: WritableByteChannel, position: Long): Long = {
assert(position == _transferred)
if (position == size) return 0L
var keepGoing = true
var written = 0L
var currentChunk = chunks(currentChunkIdx)
while (keepGoing) {
while (currentChunk.hasRemaining && keepGoing) {
val ioSize = Math.min(currentChunk.remaining(), ioChunkSize)
val originalLimit = currentChunk.limit()
currentChunk.limit(currentChunk.position() + ioSize)
val thisWriteSize = target.write(currentChunk)
currentChunk.limit(originalLimit)
written += thisWriteSize
if (thisWriteSize < ioSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will be happened if thisWriteSize is smaller than ioSize, will Spark throw an exception or something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually this is a totally normal condition, it just means the channel is not currently ready to accept anymore data. This is something netty expects, and it will make sure the rest of the data is put on the channel eventually (it'll get called the next time with the correct position argument indicating how far along it is).

The added unit tests cover this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for explain.

// the channel did not accept our entire write. We do *not* keep trying -- netty wants
// us to just stop, and report how much we've written.
keepGoing = false
}
}
if (keepGoing) {
// advance to the next chunk (if there are any more)
currentChunkIdx += 1
if (currentChunkIdx == chunks.size) {
keepGoing = false
} else {
currentChunk = chunks(currentChunkIdx)
}
}
}
_transferred += written
written
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.io

import java.nio.ByteBuffer
import java.nio.channels.WritableByteChannel

import scala.util.Random

import org.mockito.Mockito.when
import org.scalatest.BeforeAndAfterEach
import org.scalatest.mockito.MockitoSugar

import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
import org.apache.spark.internal.config
import org.apache.spark.util.io.ChunkedByteBuffer

class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar
with BeforeAndAfterEach {

override protected def beforeEach(): Unit = {
super.beforeEach()
val conf = new SparkConf()
val env = mock[SparkEnv]
SparkEnv.set(env)
when(env.conf).thenReturn(conf)
}

override protected def afterEach(): Unit = {
SparkEnv.set(null)
}

private def generateChunkedByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = {
val bytes = (0 until nChunks).map { chunkIdx =>
val bb = ByteBuffer.allocate(perChunk)
(0 until perChunk).foreach { idx =>
bb.put((chunkIdx * perChunk + idx).toByte)
}
bb.position(0)
bb
}.toArray
new ChunkedByteBuffer(bytes)
}

test("transferTo can stop and resume correctly") {
SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
val cbb = generateChunkedByteBuffer(4, 10)
val fileRegion = cbb.toNetty

val targetChannel = new LimitedWritableByteChannel(40)

var pos = 0L
// write the fileregion to the channel, but with the transfer limited at various spots along
// the way.

// limit to within the first chunk
targetChannel.acceptNBytes = 5
pos = fileRegion.transferTo(targetChannel, pos)
assert(targetChannel.pos === 5)

// a little bit further within the first chunk
targetChannel.acceptNBytes = 2
pos += fileRegion.transferTo(targetChannel, pos)
assert(targetChannel.pos === 7)

// past the first chunk, into the 2nd
targetChannel.acceptNBytes = 6
pos += fileRegion.transferTo(targetChannel, pos)
assert(targetChannel.pos === 13)

// right to the end of the 2nd chunk
targetChannel.acceptNBytes = 7
pos += fileRegion.transferTo(targetChannel, pos)
assert(targetChannel.pos === 20)

// rest of 2nd chunk, all of 3rd, some of 4th
targetChannel.acceptNBytes = 15
pos += fileRegion.transferTo(targetChannel, pos)
assert(targetChannel.pos === 35)

// now till the end
targetChannel.acceptNBytes = 5
pos += fileRegion.transferTo(targetChannel, pos)
assert(targetChannel.pos === 40)

// calling again at the end should be OK
targetChannel.acceptNBytes = 20
fileRegion.transferTo(targetChannel, pos)
assert(targetChannel.pos === 40)
}

test(s"transfer to with random limits") {
val rng = new Random()
val seed = System.currentTimeMillis()
logInfo(s"seed = $seed")
rng.setSeed(seed)
val chunkSize = 1e4.toInt
SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong)

val cbb = generateChunkedByteBuffer(50, chunkSize)
val fileRegion = cbb.toNetty
val transferLimit = 1e5.toInt
val targetChannel = new LimitedWritableByteChannel(transferLimit)
while (targetChannel.pos < cbb.size) {
val nextTransferSize = rng.nextInt(transferLimit)
targetChannel.acceptNBytes = nextTransferSize
fileRegion.transferTo(targetChannel, targetChannel.pos)
}
assert(0 === fileRegion.transferTo(targetChannel, targetChannel.pos))
}

/**
* This mocks a channel which only accepts a limited number of bytes at a time. It also verifies
* the written data matches our expectations as the data is received.
*/
private class LimitedWritableByteChannel(maxWriteSize: Int) extends WritableByteChannel {
val bytes = new Array[Byte](maxWriteSize)
var acceptNBytes = 0
var pos = 0

override def write(src: ByteBuffer): Int = {
val length = math.min(acceptNBytes, src.remaining())
src.get(bytes, 0, length)
Copy link
Member

@Ngone51 Ngone51 May 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We override bytes array's previously written data ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is just test code, we're just checking the data that gets written is what we expect (which we know based on the absolute position). Really, I could read just one byte at a time and check that is it the right data, but it seemed a little easier this way.

acceptNBytes -= length
// verify we got the right data
(0 until length).foreach { idx =>
assert(bytes(idx) === (pos + idx).toByte, s"; wrong data at ${pos + idx}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

; ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

${pos + idx} or ${idx} ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

';' because it separates the automatic portion of the error msg, making it easier to read IMO:

0 did not equal 1; wrong data at 0

pos + idx I think is more appropriate, its more helpful to know the position in the overall stream of data.

Copy link
Member

@Ngone51 Ngone51 May 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Just because override the bytes array and the virtual ${pos + idx} array position make me a little confused. Anyway, it is really a good designed test, especially for data verify part.

}
pos += length
length
}

override def isOpen: Boolean = true

override def close(): Unit = {}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite with SharedSparkContext {
assert(emptyChunkedByteBuffer.getChunks().isEmpty)
assert(emptyChunkedByteBuffer.toArray === Array.empty)
assert(emptyChunkedByteBuffer.toByteBuffer.capacity() === 0)
assert(emptyChunkedByteBuffer.toNetty.capacity() === 0)
assert(emptyChunkedByteBuffer.toNetty.count() === 0)
emptyChunkedByteBuffer.toInputStream(dispose = false).close()
emptyChunkedByteBuffer.toInputStream(dispose = true).close()
}
Expand Down