-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24307][CORE] Support reading remote cached partitions > 2gb #21440
Conversation
(1) Netty's ByteBuf cannot support data > 2gb. So to transfer data from a ChunkedByteBuffer over the network, we use a custom version of FileRegion which is backed by the ChunkedByteBuffer. (2) On the receiving end, we need to expose all the data in a FileSegmentManagedBuffer as a ChunkedByteBuffer. We do that by memory mapping the entire file in chunks. Added unit tests. Also tested on a cluster with remote cache reads > 2gb (in memory and on disk).
@@ -659,6 +659,11 @@ private[spark] class BlockManager( | |||
* Get block from remote block managers as serialized bytes. | |||
*/ | |||
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { | |||
// TODO if we change this method to return the ManagedBuffer, then getRemoteValues | |||
// could just use the inputStream on the temp file, rather than memory-mapping the file. | |||
// Until then, replication can go cause the process to use too much memory and get killed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grammar
Test build #91194 has finished for PR 21440 at commit
|
private val chunks = chunkedByteBuffer.getChunks() | ||
private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()} | ||
private val size = cumLength.last | ||
// Chunk size in bytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this comment be moved above last line ?
|
||
protected def deallocate: Unit = {} | ||
|
||
override def count(): Long = chunkedByteBuffer.size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between size
and count
? Should count
indicates the rest data's size can be transfered ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no difference, count()
is just to satisfy an interface. My mistake for having them look different, I'll make them the same
var keepGoing = true | ||
var written = 0L | ||
var currentChunk = chunks(currentChunkIdx) | ||
var originalLimit = currentChunk.limit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems it is unused.
acceptNBytes -= length | ||
// verify we got the right data | ||
(0 until length).foreach { idx => | ||
assert(bytes(idx) === (pos + idx).toByte, s"; wrong data at ${pos + idx}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
;
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
${pos + idx}
or ${idx}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
';' because it separates the automatic portion of the error msg, making it easier to read IMO:
0 did not equal 1; wrong data at 0
pos + idx
I think is more appropriate, its more helpful to know the position in the overall stream of data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Just because override the bytes
array and the virtual ${pos + idx}
array position make me a little confused. Anyway, it is really a good designed test, especially for data verify
part.
SparkEnv.set(null) | ||
} | ||
|
||
private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: generateChunkedByteBuffer
var pos = 0 | ||
|
||
override def write(src: ByteBuffer): Int = { | ||
val origSrcPos = src.position() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also seems it is unused.
override def write(src: ByteBuffer): Int = { | ||
val origSrcPos = src.position() | ||
val length = math.min(acceptNBytes, src.remaining()) | ||
src.get(bytes, 0, length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We override bytes
array's previously written data ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is just test code, we're just checking the data that gets written is what we expect (which we know based on the absolute position). Really, I could read just one byte at a time and check that is it the right data, but it seemed a little easier this way.
// could just use the inputStream on the temp file, rather than memory-mapping the file. | ||
// Until then, replication can cause the process to use too much memory and get killed | ||
// by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though | ||
// we've read the data to disk. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw this fix is such low-hanging fruit that I would try to do this immediately afterwards. (I haven't filed a jira yet just because there are already so many defunct jira related to this, I was going to wait till my changes got some traction).
I think its OK to get it in like this first, as this makes the behavior for 2.01 gb basically the same as it was for 1.99 gb.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a java OOM, since its a memory-mapped file
I'm not sure why memory-mapped file will cause too much memory? AFAIK memory mapping is a lazy loading mechanism in page-wise, system will only load the to-be-accessed file segment to memory page, not the whole file to memory. So from my understanding even very small physical memory could map a super large file. Memory mapping will not occupy too much memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be honest I don't have perfect understanding of this, but my impression is that it is not exactly lazy loading, the OS has a lot of leeway in deciding how much to keep in memory, but that it should always release the memory under pressure. this is problematic under yarn, when the container's memory use is being monitored independently of the OS. so the OS thinks its fine to put large amounts of data in physical memory, but then the yarn NM looks at the memory use of the specific process tree, decides its over the limits it has configured, and so kills it.
At least, I've seen cases of yarn killing things for exceeding memory limits where I thought that was the case, though I did not directly confirm it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I agree with you that YARN could have some issues in calculating the exact memory usage.
thanks for the reviews @markhamstra @Ngone51 , I've updated the pr |
Test build #91259 has finished for PR 21440 at commit
|
@vanzin @JoshRosen this is also ready in the sequence of 2GB limit related changes. (I'll update #21451 now that the first change has gone in) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like transferTo
could be simpler, but after thinking for a while couldn't really come up with something...
} | ||
} | ||
|
||
def map(file: File, maxChunkSize: Int): ChunkedByteBuffer = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used anywhere? Couldn't find a reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this version isn't used till the other PR. I can pull it out there
the other version of map
is used in this pr from BlockManager.getRemoteBytes() -> ChunkedByteBuffer.fromManagedBuffer() -> ChunkedByteBuffer.map
*/ | ||
private[io] class ChunkedByteBufferFileRegion( | ||
val chunkedByteBuffer: ChunkedByteBuffer, | ||
val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extend AbstractFileRegion
?
Do the fields need to be public?
You don't seem to need Logging
.
private var _transferred: Long = 0 | ||
// this duplicates the original chunks, so we're free to modify the position, limit, etc. | ||
private val chunks = chunkedByteBuffer.getChunks() | ||
private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use foldLeft(0) { blah }
+ avoid the intermediate val
?
while (keepGoing) { | ||
while (currentChunk.hasRemaining && keepGoing) { | ||
val ioSize = Math.min(currentChunk.remaining(), ioChunkSize) | ||
val originalPos = currentChunk.position() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry bunch of leftover bits from earlier debugging. all cleaned up now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
private var _transferred: Long = 0 | ||
// this duplicates the original chunks, so we're free to modify the position, limit, etc. | ||
private val chunks = chunkedByteBuffer.getChunks() | ||
private val size = chunks.foldLeft(0) { _ + _.remaining()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space before }
|
||
/** | ||
* This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty | ||
* message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 spaces
/** | ||
* This mocks a channel which only accepts a limited number of bytes at a time. It also verifies | ||
* the written data matches our expectations as the data is received. | ||
* @param maxWriteSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
Test build #92399 has finished for PR 21440 at commit
|
Test build #92406 has finished for PR 21440 at commit
|
private var _transferred: Long = 0 | ||
// this duplicates the original chunks, so we're free to modify the position, limit, etc. | ||
private val chunks = chunkedByteBuffer.getChunks() | ||
private val size = chunks.foldLeft(0) { _ + _.remaining() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0L
? Otherwise this will overflow for > 2G right?
Test build #92533 has finished for PR 21440 at commit
|
@tgravescs @felixcheung @zsxwing maybe one of you could take a look? I got a lgtm from marcelo but he's out for a few weeks, would prefer to get another approval, plus I'll need review on #21451 |
@mridulm @jerryshao maybe you would be interested in reviewing this as well? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LG
@@ -723,7 +728,9 @@ private[spark] class BlockManager( | |||
} | |||
|
|||
if (data != null) { | |||
return Some(new ChunkedByteBuffer(data)) | |||
val chunkSize = | |||
conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Make chunkSize
as a private
field in BlockManager
instead of recomputing it each time ?
val chunks = new ListBuffer[ByteBuffer]() | ||
while (remaining > 0) { | ||
val chunkSize = math.min(remaining, maxChunkSize) | ||
val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, chunkSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if we could make these FileRegion's instead : and use transferTo
instead of write
in ChunkedByteBufferFileRegion
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand. What FileRegion
are you referring to -- the only one I know of is netty's interface. Do you mean implement another FileRegion
for each chunk?, and then have ChunkedByteBufferFileRegion
delegate to that?
We could do that, but I don't think it would be any better. ChunkedByteBufferFileRegion.transferTo
would be about as complex as now. Also it may be worth noting that this particular method really should disappear -- we shouldn't be mapping this at all, we should be using an input stream (see the TODO above), but I want to do that separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of DefaultFileRegion
.. but any other zero copy impl should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your concern is that when we are going to send data that is backed by a file, eg. a remote read of an RDD cached on disk, we should be able to send it using something more efficient than memory mapping the entire file. Is that correct?
That actually isn't a problem. This map()
method isn't called for sending disk-cached RDDs. That is already handled correctly with FileSegmentManagedBuffer.convertToNetty()
, which uses the DefaultFileRegion
you had in mind. The map
method is only used on the receiving end, after the data has already been transferred, and just to pass the data on to other spark code locally in the executor. (And that will avoid the map()
entirely after the TODO above.)
I needed to add ChunkedByteBufferFileRegion
for data that is already in memory as a ChunkedByteBuffer, eg. for memory-cached RDDs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect, thanks for clarifying !
import org.apache.spark.network.util.ByteArrayWritableChannel | ||
import org.apache.spark.storage.StorageUtils | ||
import org.apache.spark.util.Utils | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. This blank line seems not necessary.
} | ||
|
||
def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = { | ||
Utils.tryWithResource(new FileInputStream(file).getChannel()) { channel => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please use FileChannel#open
instead, FileInputStream/FileOutputStream has some issues (https://www.cloudbees.com/blog/fileinputstream-fileoutputstream-considered-harmful)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't aware of that issue, thanks for sharing that, I'll update this. Should we also update other uses? Seems there are a lot of other cases, eg. UnsafeShuffleWriter
, DiskBlockObjectWriter
, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've already updated some of them in SPARK-21475 in shuffle related code path, but not all of them which are not so critical.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great, thanks for the explanation
val thisWriteSize = target.write(currentChunk) | ||
currentChunk.limit(originalLimit) | ||
written += thisWriteSize | ||
if (thisWriteSize < ioSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will be happened if thisWriteSize
is smaller than ioSize
, will Spark throw an exception or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually this is a totally normal condition, it just means the channel is not currently ready to accept anymore data. This is something netty expects, and it will make sure the rest of the data is put on the channel eventually (it'll get called the next time with the correct position
argument indicating how far along it is).
The added unit tests cover this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks for explain.
LGTM, thanks for working on this @squito ! |
Test build #93234 has finished for PR 21440 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Merging to master.
Although the code quality is pretty good, I am still afraid it could introduce some unexpected issues. Is that possible we can introduce a conf to disable the new changes and use the previous implementation? We can remove the conf in the next release. |
@gatorsmile sure, thats pretty easy. I'll submit a follow up pr. |
@squito Thank you! |
(1) Netty's ByteBuf cannot support data > 2gb. So to transfer data from a
ChunkedByteBuffer over the network, we use a custom version of
FileRegion which is backed by the ChunkedByteBuffer.
(2) On the receiving end, we need to expose all the data in a
FileSegmentManagedBuffer as a ChunkedByteBuffer. We do that by memory
mapping the entire file in chunks.
Added unit tests. Ran the randomized test a couple of hundred times on my laptop. Tests cover the equivalent of SPARK-24107 for the ChunkedByteBufferFileRegion. Also tested on a cluster with remote cache reads >2gb (in memory and on disk).