Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-3587: [Python] Efficient serialization for Arrow Objects (array, table, tensor, etc) #2832

Closed
wants to merge 2 commits into from

Conversation

suquark
Copy link
Contributor

@suquark suquark commented Oct 24, 2018

This PR enables efficient serialization for Arrow Objects (array, table, tensor, record batch).

Copy link
Contributor

@pcmoritz pcmoritz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 can be merged when the tests pass

@dhirschfeld
Copy link

I'm curious how this relates to #2292 and #2161?

@suquark
Copy link
Contributor Author

suquark commented Oct 25, 2018

@dhirschfeld #2292 and #2161 focused on pickling support while this PR focus on serialization for plasma store. Plasma store needs plasma.serialize & plasma.deserialize to store objects or reconstruct objects.

@wesm
Copy link
Member

wesm commented Oct 25, 2018

Can you hold off on merging for me to take a look

@pcmoritz
Copy link
Contributor

Yeah, it also looks like there is currently a segfault in the IPC test

@suquark
Copy link
Contributor Author

suquark commented Oct 26, 2018

@pcmoritz I have fixed the bug. It is a potential bug remained in the previous version, where they forgot to align the stream before writing new blocks to it. It's lucky that in the previous version, it happened to be aligned. But after I created new fields, its alignment would depend on the compiler, so only certain compilers failed in CI.

RETURN_NOT_OK(
src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast<uint8_t*>(&num_buffers)));

// Align stream to 8-byte offset
RETURN_NOT_OK(ipc::AlignStream(src, ipc::kArrowIpcAlignment));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to look more closely at this before this patch is merged. I spent a bunch of time in c9ac869 on this and so I want to make sure that stream alignment is happening at the highest level possible rather than leaking into lower-level implementation details

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wesm Let me know about your thoughts on this, it seems that handling the alignment here is analogous to how the alignment for Tensors is handled down below in correspondence with c9ac869, so this looks good to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should probably be changed to take InputStream* src as input and handle alignment one level higher. I'm going to check out this branch and take a look

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a closer look and we can leave as is (the extra 4 bytes at the start of the object made the need for the alignment)

@pcmoritz
Copy link
Contributor

@suquark Great find, thanks for looking into this!

@wesm Yeah, please let us know what you think the best solution here is

@pcmoritz
Copy link
Contributor

@wesm: Let me know if I can help with that, it would be good to get this merged. If you have a high level idea on where the alignment should be done let me know.

@wesm
Copy link
Member

wesm commented Oct 29, 2018

I'm on the road and struggling with review bandwidth. I should be able to get to it in the next few days.

What's it going to take to get Ray running against released versions of Arrow? We could really use your assistance with release management.

@pcmoritz
Copy link
Contributor

Thanks, regarding the PR that sounds good!

Regarding the second question, I created an issue to track it here: ray-project/ray#3162

@wesm
Copy link
Member

wesm commented Nov 7, 2018

I will review today, am finished traveling for now so my review bandwidth should be coming back up

@wesm wesm changed the title [WIP] ARROW-3587: [Python] Efficient serialization for Arrow Objects (array, table, tensor, etc) ARROW-3587: [Python] Efficient serialization for Arrow Objects (array, table, tensor, etc) Nov 8, 2018
Copy link
Member

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good. I'm going to check out the branch and see if I can make the alignment change I described

RETURN_NOT_OK(
src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast<uint8_t*>(&num_buffers)));

// Align stream to 8-byte offset
RETURN_NOT_OK(ipc::AlignStream(src, ipc::kArrowIpcAlignment));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should probably be changed to take InputStream* src as input and handle alignment one level higher. I'm going to check out this branch and take a look

Change-Id: I4d8f0f1b5e33fc722456a21a272bc1a8ea07f918
@wesm
Copy link
Member

wesm commented Nov 8, 2018

I just squashed and rebased this since it was not rebasing cleanly

Copy link
Member

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Will merge once the build is passing

@wesm
Copy link
Member

wesm commented Nov 8, 2018

Hm, seeing this error

______________________________ test_plasma_tf_op _______________________________
use_gpu = False
    @pytest.mark.plasma
    @pytest.mark.tensorflow
    def test_plasma_tf_op(use_gpu=False):
        import pyarrow.plasma as plasma
        import tensorflow as tf
    
        plasma.build_plasma_tensorflow_op()
    
        if plasma.tf_plasma_op is None:
            pytest.skip("TensorFlow Op not found")
    
        with plasma.start_plasma_store(10**8) as (plasma_store_name, p):
            client = plasma.connect(plasma_store_name, "", 0)
            for dtype in [np.float32, np.float64,
                          np.int8, np.int16, np.int32, np.int64]:
                run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name,
>                                              client, use_gpu, dtype)
pyarrow/tests/test_plasma_tf_op.py:101: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
tf = <module 'tensorflow' from '/home/travis/build/apache/arrow/pyarrow-test-2.7/lib/python2.7/site-packages/tensorflow/__init__.pyc'>
plasma = <module 'pyarrow.plasma' from '/home/travis/build/apache/arrow/python/pyarrow/plasma.pyc'>
plasma_store_name = '/tmp/test_plasma-QwnLmF/plasma.sock'
client = <pyarrow._plasma.PlasmaClient object at 0x7f96d96123f0>
use_gpu = False, dtype = <type 'numpy.float32'>
    def run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name,
                                       client, use_gpu, dtype):
        FORCE_DEVICE = '/gpu' if use_gpu else '/cpu'
    
        object_id = np.random.bytes(20)
    
        data = np.random.randn(3, 244, 244).astype(dtype)
        ones = np.ones((3, 244, 244)).astype(dtype)
    
        sess = tf.Session(config=tf.ConfigProto(
            allow_soft_placement=True, log_device_placement=True))
    
        def ToPlasma():
            data_tensor = tf.constant(data)
            ones_tensor = tf.constant(ones)
            return plasma.tf_plasma_op.tensor_to_plasma(
                [data_tensor, ones_tensor],
                object_id,
                plasma_store_socket_name=plasma_store_name,
                plasma_manager_socket_name="")
    
        def FromPlasma():
            return plasma.tf_plasma_op.plasma_to_tensor(
                object_id,
                dtype=tf.as_dtype(dtype),
                plasma_store_socket_name=plasma_store_name,
                plasma_manager_socket_name="")
    
        with tf.device(FORCE_DEVICE):
            to_plasma = ToPlasma()
            from_plasma = FromPlasma()
    
        z = from_plasma + 1
    
        sess.run(to_plasma)
        # NOTE(zongheng): currently it returns a flat 1D tensor.
        # So reshape manually.
        out = sess.run(from_plasma)
    
        out = np.split(out, 2)
        out0 = out[0].reshape(3, 244, 244)
        out1 = out[1].reshape(3, 244, 244)
    
        sess.run(z)
    
        assert np.array_equal(data, out0), "Data not equal!"
        assert np.array_equal(ones, out1), "Data not equal!"
    
        # Try getting the data from Python
        plasma_object_id = plasma.ObjectID(object_id)
        obj = client.get(plasma_object_id)
    
        # Deserialized Tensor should be 64-byte aligned.
>       assert obj.ctypes.data % 64 == 0
E       AttributeError: 'pyarrow.lib.Tensor' object has no attribute 'ctypes'
pyarrow/tests/test_plasma_tf_op.py:75: AttributeError

Seems it might be related

@suquark
Copy link
Contributor Author

suquark commented Nov 8, 2018

Yes. I am trying to find out why it fails.

@suquark
Copy link
Contributor Author

suquark commented Nov 8, 2018

This bug is caused by a previous inconsistent where you put pa.Tensor to plasma, you get a numpy array instead. Since we have changed this in this PR (tensor to tensor, numpy to numpy), this test is outdated.

@wesm
Copy link
Member

wesm commented Nov 8, 2018

BTW I had force pushed your branch so things got a bit screwy now. You should try to never use 'git merge' if you can avoid it and stick to rebase / rebase -i

@codecov-io
Copy link

Codecov Report

Merging #2832 into master will increase coverage by 1.14%.
The diff coverage is 95.28%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #2832      +/-   ##
==========================================
+ Coverage   87.39%   88.54%   +1.14%     
==========================================
  Files         416      348      -68     
  Lines       65195    60127    -5068     
==========================================
- Hits        56977    53237    -3740     
+ Misses       8124     6890    -1234     
+ Partials       94        0      -94
Impacted Files Coverage Δ
cpp/src/arrow/python/serialize.h 0% <ø> (ø) ⬆️
python/pyarrow/tests/test_plasma_tf_op.py 97.87% <100%> (+0.04%) ⬆️
python/pyarrow/serialization.py 86.2% <100%> (+4.81%) ⬆️
python/pyarrow/serialization.pxi 75.9% <100%> (+0.14%) ⬆️
cpp/src/arrow/python/serialize.cc 89.95% <92%> (+0.3%) ⬆️
python/pyarrow/tests/test_serialization.py 90.39% <92.85%> (+0.08%) ⬆️
cpp/src/arrow/python/deserialize.cc 91.7% <94.11%> (+0.49%) ⬆️
rust/src/record_batch.rs
go/arrow/array/table.go
rust/src/array.rs
... and 67 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 3ab4a0f...a608d1b. Read the comment docs.

@suquark
Copy link
Contributor Author

suquark commented Nov 8, 2018

OK. I rebased the branch. There should be no problem now.

@suquark
Copy link
Contributor Author

suquark commented Nov 8, 2018

Ready to merge. The error is irrelevant (gandiva's benchmark could take more time because CI performance is not stable).

@pcmoritz pcmoritz closed this in d290538 Nov 8, 2018
@pcmoritz
Copy link
Contributor

pcmoritz commented Nov 8, 2018

Awesome, thanks! Can you create an account in the Arrow JIRA so this can be assigned to you?

@suquark
Copy link
Contributor Author

suquark commented Nov 8, 2018

@pcmoritz I am the reporter of https://issues.apache.org/jira/browse/ARROW-3587

@pcmoritz
Copy link
Contributor

pcmoritz commented Nov 8, 2018

Hm, yeah but somehow I can't assign you. Maybe @jacques-n has to add you to the ARROW project?

@kou
Copy link
Member

kou commented Nov 8, 2018

We need to assign the user to "contributes" role at https://issues.apache.org/jira/plugins/servlet/project-config/ARROW/roles .
I've added @suquark and assigned him to the issue. Thanks!

@pcmoritz
Copy link
Contributor

pcmoritz commented Nov 8, 2018

Ah ok thanks! For me it says "You cannot edit the configuration of this project.".

@kou
Copy link
Member

kou commented Nov 8, 2018

Ah, a PMC member needs to do it.

@pcmoritz
Copy link
Contributor

pcmoritz commented Nov 8, 2018

Hm, I'm a PMC member but maybe JIRA doesn't know about that ;)

@kou
Copy link
Member

kou commented Nov 8, 2018

I've added "Administrators" role to you. :-)

@pcmoritz
Copy link
Contributor

pcmoritz commented Nov 8, 2018

Cool, thanks! Also thanks for the explanation, in the past I never understood why I couldn't assign issues to people even though they had a JIRA account :)

robertnishihara pushed a commit to ray-project/ray that referenced this pull request Dec 2, 2018
This includes a fix so the TensorFlow op releases memory properly (apache/arrow#3061) and the possibility to store arrow data structures in plasma (apache/arrow#2832).

#3404
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants