Skip to content

Commit

Permalink
add put and get
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz committed Aug 26, 2017
1 parent 44ada47 commit e1924a4
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 8 deletions.
9 changes: 9 additions & 0 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,15 @@ cdef class NativeFile:
cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader)
cdef get_writer(object source, shared_ptr[OutputStream]* writer)

cdef class SerializedPyObject:
cdef:
CSerializedPyObject data

cdef readonly:
object base

cdef _write_to(self, OutputStream* stream)

cdef public object pyarrow_wrap_buffer(const shared_ptr[CBuffer]& buf)
cdef public object pyarrow_wrap_data_type(const shared_ptr[CDataType]& type)
cdef public object pyarrow_wrap_field(const shared_ptr[CField]& field)
Expand Down
20 changes: 18 additions & 2 deletions python/pyarrow/plasma.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ from libcpp.vector cimport vector as c_vector
from libc.stdint cimport int64_t, uint8_t, uintptr_t
from cpython.pycapsule cimport *

from pyarrow.lib cimport Buffer, NativeFile, check_status
import pyarrow

from pyarrow.lib cimport Buffer, NativeFile, check_status, SerializedPyObject
from pyarrow.includes.libarrow cimport (CMutableBuffer, CBuffer,
CFixedSizeBufferWriter, CStatus)

Expand Down Expand Up @@ -296,7 +298,7 @@ cdef class PlasmaClient:
----------
object_ids : list
A list of ObjectIDs used to identify some objects.
timeout_ms :int
timeout_ms : int
The number of milliseconds that the get call should block before
timing out and returning. Pass -1 if the call should block and 0
if the call should return immediately.
Expand Down Expand Up @@ -588,3 +590,17 @@ def connect(store_socket_name, manager_socket_name, int release_delay,
result.manager_socket_name,
release_delay, num_retries))
return result

def put(PlasmaClient client, value):
cdef ObjectID object_id = ObjectID.from_random()
cdef SerializedPyObject serialized = pyarrow.serialize(value)
buffer = client.create(object_id, serialized.total_bytes)
serialized.write_to(buffer)
return object_id

def get(PlasmaClient client, object_ids, timeout_ms=-1):
results = []
buffers = client.get(object_ids, timeout_ms)
for buffer in buffers:
results.append(pyarrow.deserialize(buffer))
return results
5 changes: 0 additions & 5 deletions python/pyarrow/serialization.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,6 @@ cdef class SerializedPyObject:
"""
Arrow-serialized representation of Python object
"""
cdef:
CSerializedPyObject data

cdef readonly:
object base

property total_bytes:

Expand Down
8 changes: 7 additions & 1 deletion python/pyarrow/tests/test_plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
"""
if use_valgrind and use_profiler:
raise Exception("Cannot use valgrind and profiler at the same time.")
plasma_store_executable = os.path.join(pa.__path__[0], "plasma_store")
plasma_store_executable = "plasma_store"
plasma_store_name = "/tmp/plasma_store{}".format(random_name())
command = [plasma_store_executable,
"-s", plasma_store_name,
Expand Down Expand Up @@ -273,6 +273,12 @@ def test_get(self):
else:
assert results[i] is None

def test_put_and_get(self):
value = ["hello", "world", 3, 1.0]
object_id = pa.plasma.put(self.plasma_client, value)
[result] = pa.plasma.get(self.plasma_client, [object_id])
assert result == value

def test_store_arrow_objects(self):
data = np.random.randn(10, 4)
# Write an arrow object.
Expand Down

0 comments on commit e1924a4

Please sign in to comment.