[query] Broadcast with array of array byte streams#10766
[query] Broadcast with array of array byte streams#10766danking merged 7 commits intohail-is:mainfrom
Conversation
|
Woah, so fast! I think this is going to have poor performance until you implement the batched readers writers that look like public void write(byte b[], int off, int len) throws IOException
public int read(byte b[], int off, int len) throws IOException |
|
I agree that would be faster, but do we use those when we encode / decode broadcasted data? We should be if we aren't, but since it's working currently my suspicion is we aren't using them? Unless there's a default implementation where they work in terms of the single byte versions? |
|
The gist suggests if anything things got slightly faster after doing this, though admittedly the sentinels all got faster too.: https://gist.github.com/johnc1231/cb10f22dad676bb30da680b9e2178614 |
|
This is really baffling. Maybe this code isn't really used in benchmarks in local mode? I would expect that every write/read from these buffers for SerializableRegionValue would go through the array/buffer read calls, because we use StreamBlockBuffer for the buffer spec: val wireSpec: BufferSpec = LEB128BufferSpec(
BlockingBufferSpec(32 * 1024,
LZ4SizeBasedBlockBufferSpec("fast", 32 * 1024,
256,
new StreamBlockBufferSpec)))We won't call write/read on the output/input buffers except to write/read blocks. |
|
Maybe it's because So that comes baked in using the single byte |
Yeah, that default implementation should be WAY slower than an implementation that uses System.arrayCopy though. That's why I'm surprised you don't see performance regressions. |
d5b52d8 to
87f60ac
Compare
|
Ok, wrote the multi byte versions, running benchmarks. |
|
https://gist.github.com/johnc1231/7667f228d022c636d42924dbd181bc96 Seems like not a lot of change, though it's definitely trending slower instead of trending faster the way the earlier one did. |
tpoterba
left a comment
There was a problem hiding this comment.
one comment and ready to go
| buf += new ByteArrayOutputStream(initialBufferCapacity) | ||
|
|
||
| protected var bytesInCurrentArray = 0 | ||
| protected var currentArray = 0 |
There was a problem hiding this comment.
instead of having a currentArray index, can we just have a currentBuilder: ByteArrayOutputStream? I think having to do a lookup by index in the BoxedArrayBuilder in buffer might be a performance hit, and the code will probably be simpler regardless.
…nce of ByteArrayOutputStream
…ue to use them, now upper limit on data size is effectively removed
…uilder to avoid lookups
87f60ac to
d35e646
Compare
|
Addressed, want me to benchmark again? |
|
nah, ship it. |
I ran into issues when broadcasting a very large struct of ndarrays for a huge linear regression, where the the total size was more than
MAX_INTbytes. To solve this, I've changedSerializableRegionValueto useArrayOfByteArrayOutputStreamandArrayOfByteInputStream, which create nested arrays of bytes instead of just one array, removing any maximum length issues.PRing now for tests, also running benchmarks.