-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-6317: [JS] Implement IPC message format alignment changes #5225
ARROW-6317: [JS] Implement IPC message format alignment changes #5225
Conversation
@wesm C++ <-> JS integration tests are passing: |
@TheNeuralBit do you have time to review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just some minor comments. Thanks Paul!
@@ -77,7 +80,7 @@ export class MessageReader implements IterableIterator<Message> { | |||
const buf = this.source.read(PADDING); | |||
const bb = buf && new ByteBuffer(buf); | |||
const len = +(bb && bb.readInt32(0))!; | |||
return { done: len <= 0, value: len }; | |||
return { done: len === 0, value: len }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have some kind of assertion somewhere that verifies length > 0? What happens if a corrupt file has that value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That’s effectively handled in the subsequent call. We use the length read here as the number of bytes to consume from the stream, which should be the flatbuffer metadata.
If the length is incorrect, the flatbuffer metadata created from the buffer will be empty. If the length is too long, the stream will read as many bytes as it can, and close automatically. Both cases are interpreted by the stream reader as bad input, and ignored.
@@ -104,6 +107,9 @@ export class AsyncMessageReader implements AsyncIterableIterator<Message> { | |||
public async next(): Promise<IteratorResult<Message>> { | |||
let r; | |||
if ((r = await this.readMetadataLength()).done) { return ITERATOR_DONE; } | |||
// ARROW-6313: If the first 4 bytes are continuation message, read the next 4 for the length | |||
if ((r.value === -1) && | |||
(r = await this.readMetadataLength()).done) { return ITERATOR_DONE; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's too bad we have to duplicate this code. Could the readers be combined somehow? Not in this PR, just curious
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately not. Async functions are implemented via promises, which don’t allow parameterizing the scheduler. Async functions force the caller to handle the returned promise, guaranteeing the function doesn’t finish its work synchronously, even if it could.
This is a key difference between JS promises and Python’s asyncio module, which does execute synchronously if no actual async work is performed.
@TheNeuralBit just pushed up some changes to add a |
Is this ready for merging? |
@fsaintjacques I believe so yes |
According to the mailing list it seems that we need to change the EOS marker size too. I'll wait for this to settle first. |
I'll make a Format patch today with the changes and change the C++ library, the change shouldn't be too complex |
I'm testing out the new EOS bytes change in JS right now, will push a commit here in a bit if it works. |
Awesome, thanks! |
4f9b887
to
0352456
Compare
@wesm have the C++ changes for the new 8-byte eos message been pushed to the integration branch? If so, I can pull locally and validate against the JS changes. |
@trxcllnt I just pushed the change to the branch, so you can rebase and test |
I just rebased |
It looks like only the EOS writing is probably all that needs to be changed |
@wesm I pulled the latest from this branch and rebuilt everything, but getting errors in the integration runner: C++ producing, C++ consuming========================================================== Testing file /home/ptaylor/dev/arrow/integration/data/struct_example.json ========================================================== -- Creating binary inputs -- Validating file -- Validating stream Segmentation fault Traceback (most recent call last): File "integration/integration_test.py", line 1284, in _compare_implementations run_binaries(producer, consumer, test_case) File "integration/integration_test.py", line 1315, in _produce_consume consumer.stream_to_file(producer_stream_path, consumer_file_path) File "integration/integration_test.py", line 1554, in stream_to_file self.run_shell_command(cmd) File "integration/integration_test.py", line 1389, in run_shell_command subprocess.check_call(cmd, shell=True) File "/usr/lib/python3.6/subprocess.py", line 311, in check_call raise CalledProcessError(retcode, cmd) subprocess.CalledProcessError: Command 'cat /tmp/tmpbp24i1lj/4ee47ec3_struct_example.producer_file_as_stream | /home/ptaylor/dev/arrow/cpp/build/debug/arrow-stream-to-file > /tmp/tmpbp24i1lj/4ee47ec3_struct_example.consumer_stream_as_file' returned non-zero exit status 139. JS producing, C++ consuming========================================================== Testing file /home/ptaylor/dev/arrow/integration/data/struct_example.json ========================================================== -- Creating binary inputs -- Validating file -- Validating stream Segmentation fault Traceback (most recent call last): File "integration/integration_test.py", line 1284, in _compare_implementations run_binaries(producer, consumer, test_case) File "integration/integration_test.py", line 1315, in _produce_consume consumer.stream_to_file(producer_stream_path, consumer_file_path) File "integration/integration_test.py", line 1554, in stream_to_file self.run_shell_command(cmd) File "integration/integration_test.py", line 1389, in run_shell_command subprocess.check_call(cmd, shell=True) File "/usr/lib/python3.6/subprocess.py", line 311, in check_call raise CalledProcessError(retcode, cmd) subprocess.CalledProcessError: Command 'cat /tmp/tmpbp24i1lj/5f87e545_struct_example.producer_file_as_stream | /home/ptaylor/dev/arrow/cpp/build/debug/arrow-stream-to-file > /tmp/tmpbp24i1lj/5f87e545_struct_example.consumer_stream_as_file' returned non-zero exit status 139. |
19c5e0f
to
b69bc0e
Compare
@wesm nevermind, looks like it was a bad rebase. I just re-branched from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Awesome, thanks.
Implements the IPC message format alignment changes for [ARROW-6313](https://issues.apache.org/jira/browse/ARROW-6313). In this PR the `MessageReader` can read messages with the old alignment, but `RecordBatchWriter` always produces messages with the new alignment. I can add a flag if others think it'd be useful to produce messages in the old format. Closes #5225 from trxcllnt/ARROW-6314 and squashes the following commits: b69bc0e <ptaylor> update ipc reader and writer for ARROW-6313 Authored-by: ptaylor <paul.e.taylor@me.com> Signed-off-by: Wes McKinney <wesm+git@apache.org>
Implements the IPC message format alignment changes for [ARROW-6313](https://issues.apache.org/jira/browse/ARROW-6313). In this PR the `MessageReader` can read messages with the old alignment, but `RecordBatchWriter` always produces messages with the new alignment. I can add a flag if others think it'd be useful to produce messages in the old format. Closes #5225 from trxcllnt/ARROW-6314 and squashes the following commits: b69bc0e <ptaylor> update ipc reader and writer for ARROW-6313 Authored-by: ptaylor <paul.e.taylor@me.com> Signed-off-by: Wes McKinney <wesm+git@apache.org>
Implements the IPC message format alignment changes for [ARROW-6313](https://issues.apache.org/jira/browse/ARROW-6313). In this PR the `MessageReader` can read messages with the old alignment, but `RecordBatchWriter` always produces messages with the new alignment. I can add a flag if others think it'd be useful to produce messages in the old format. Closes apache#5225 from trxcllnt/ARROW-6314 and squashes the following commits: b69bc0e <ptaylor> update ipc reader and writer for ARROW-6313 Authored-by: ptaylor <paul.e.taylor@me.com> Signed-off-by: Wes McKinney <wesm+git@apache.org>
Implements the IPC message format alignment changes for ARROW-6313. In this PR the
MessageReader
can read messages with the old alignment, butRecordBatchWriter
always produces messages with the new alignment. I can add a flag if others think it'd be useful to produce messages in the old format.