From e1924a45f5024de589d6ec71436d57acf42f64db Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 25 Aug 2017 22:51:49 -0700 Subject: [PATCH] add put and get --- python/pyarrow/lib.pxd | 9 +++++++++ python/pyarrow/plasma.pyx | 20 ++++++++++++++++++-- python/pyarrow/serialization.pxi | 5 ----- python/pyarrow/tests/test_plasma.py | 8 +++++++- 4 files changed, 34 insertions(+), 8 deletions(-) diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 48a58f7b82660..c377be37e20f7 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -330,6 +330,15 @@ cdef class NativeFile: cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader) cdef get_writer(object source, shared_ptr[OutputStream]* writer) +cdef class SerializedPyObject: + cdef: + CSerializedPyObject data + + cdef readonly: + object base + + cdef _write_to(self, OutputStream* stream) + cdef public object pyarrow_wrap_buffer(const shared_ptr[CBuffer]& buf) cdef public object pyarrow_wrap_data_type(const shared_ptr[CDataType]& type) cdef public object pyarrow_wrap_field(const shared_ptr[CField]& field) diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index befa283d85b54..9e094ac1bfac4 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -26,7 +26,9 @@ from libcpp.vector cimport vector as c_vector from libc.stdint cimport int64_t, uint8_t, uintptr_t from cpython.pycapsule cimport * -from pyarrow.lib cimport Buffer, NativeFile, check_status +import pyarrow + +from pyarrow.lib cimport Buffer, NativeFile, check_status, SerializedPyObject from pyarrow.includes.libarrow cimport (CMutableBuffer, CBuffer, CFixedSizeBufferWriter, CStatus) @@ -296,7 +298,7 @@ cdef class PlasmaClient: ---------- object_ids : list A list of ObjectIDs used to identify some objects. - timeout_ms :int + timeout_ms : int The number of milliseconds that the get call should block before timing out and returning. Pass -1 if the call should block and 0 if the call should return immediately. @@ -588,3 +590,17 @@ def connect(store_socket_name, manager_socket_name, int release_delay, result.manager_socket_name, release_delay, num_retries)) return result + +def put(PlasmaClient client, value): + cdef ObjectID object_id = ObjectID.from_random() + cdef SerializedPyObject serialized = pyarrow.serialize(value) + buffer = client.create(object_id, serialized.total_bytes) + serialized.write_to(buffer) + return object_id + +def get(PlasmaClient client, object_ids, timeout_ms=-1): + results = [] + buffers = client.get(object_ids, timeout_ms) + for buffer in buffers: + results.append(pyarrow.deserialize(buffer)) + return results diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi index 3ee34eed5ed95..a38a527d3e594 100644 --- a/python/pyarrow/serialization.pxi +++ b/python/pyarrow/serialization.pxi @@ -135,11 +135,6 @@ cdef class SerializedPyObject: """ Arrow-serialized representation of Python object """ - cdef: - CSerializedPyObject data - - cdef readonly: - object base property total_bytes: diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 04162bbbbade7..2b4e063b2366e 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -119,7 +119,7 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, """ if use_valgrind and use_profiler: raise Exception("Cannot use valgrind and profiler at the same time.") - plasma_store_executable = os.path.join(pa.__path__[0], "plasma_store") + plasma_store_executable = "plasma_store" plasma_store_name = "/tmp/plasma_store{}".format(random_name()) command = [plasma_store_executable, "-s", plasma_store_name, @@ -273,6 +273,12 @@ def test_get(self): else: assert results[i] is None + def test_put_and_get(self): + value = ["hello", "world", 3, 1.0] + object_id = pa.plasma.put(self.plasma_client, value) + [result] = pa.plasma.get(self.plasma_client, [object_id]) + assert result == value + def test_store_arrow_objects(self): data = np.random.randn(10, 4) # Write an arrow object.