From 3518c71a1c7c9fa3d3ee20264bdc09c906405ec4 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 26 Aug 2017 16:09:21 -0700 Subject: [PATCH] fix --- python/pyarrow/__init__.py | 4 +++- python/pyarrow/plasma.pyx | 12 +++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index abbc125c5adb9..3e269d13e52ec 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -85,12 +85,14 @@ ArrowIOError, ArrowMemoryError, ArrowNotImplementedError, - ArrowTypeError) + ArrowTypeError, + PlasmaObjectExists) # Serialization from pyarrow.lib import (deserialize_from, deserialize, serialize, serialize_to, read_serialized, SerializedPyObject, + SerializationException, DeserializationException, # This is temporary register_type, type_to_type_id, whitelisted_types, types_to_pickle, custom_serializers, custom_deserializers) diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index 9e094ac1bfac4..51300039a4436 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -591,12 +591,14 @@ def connect(store_socket_name, manager_socket_name, int release_delay, release_delay, num_retries)) return result -def put(PlasmaClient client, value): - cdef ObjectID object_id = ObjectID.from_random() +def put(PlasmaClient client, value, object_id=None): + cdef ObjectID id = object_id if object_id else ObjectID.from_random() cdef SerializedPyObject serialized = pyarrow.serialize(value) - buffer = client.create(object_id, serialized.total_bytes) - serialized.write_to(buffer) - return object_id + buffer = client.create(id, serialized.total_bytes) + stream = pyarrow.FixedSizeBufferOutputStream(buffer) + stream.set_memcopy_threads(4) + serialized.write_to(stream) + return id def get(PlasmaClient client, object_ids, timeout_ms=-1): results = []