From a9522c516250656536ec453425ff3d0529ef2474 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 19 Aug 2017 14:00:03 -0400 Subject: [PATCH] Refactoring, address code review comments. fix flake8 issues Change-Id: I9ced59de48f169b6609dd27f8239ceb22fd5ebeb --- cpp/src/arrow/builder.h | 12 +- cpp/src/arrow/python/arrow_to_python.cc | 16 +- cpp/src/arrow/python/arrow_to_python.h | 26 ++- cpp/src/arrow/python/python_to_arrow.cc | 246 +++++++++++---------- cpp/src/arrow/python/python_to_arrow.h | 29 ++- python/pyarrow/includes/libarrow.pxd | 40 ++-- python/pyarrow/serialization.pxi | 28 ++- python/pyarrow/tests/test_serialization.py | 33 ++- 8 files changed, 238 insertions(+), 192 deletions(-) diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h index 46900fc7129c1..3b851f92c1726 100644 --- a/cpp/src/arrow/builder.h +++ b/cpp/src/arrow/builder.h @@ -244,7 +244,7 @@ class ARROW_EXPORT NumericBuilder : public PrimitiveBuilder { using PrimitiveBuilder::Reserve; /// Append a single scalar and increase the size if necessary. - Status Append(value_type val) { + Status Append(const value_type val) { RETURN_NOT_OK(ArrayBuilder::Reserve(1)); UnsafeAppend(val); return Status::OK(); @@ -255,7 +255,7 @@ class ARROW_EXPORT NumericBuilder : public PrimitiveBuilder { /// /// This method does not capacity-check; make sure to call Reserve /// beforehand. - void UnsafeAppend(value_type val) { + void UnsafeAppend(const value_type val) { BitUtil::SetBit(null_bitmap_data_, length_); raw_data_[length_++] = val; } @@ -371,7 +371,7 @@ class ARROW_EXPORT AdaptiveUIntBuilder : public internal::AdaptiveIntBuilderBase using ArrayBuilder::Advance; /// Scalar append - Status Append(uint64_t val) { + Status Append(const uint64_t val) { RETURN_NOT_OK(Reserve(1)); BitUtil::SetBit(null_bitmap_data_, length_); @@ -430,7 +430,7 @@ class ARROW_EXPORT AdaptiveIntBuilder : public internal::AdaptiveIntBuilderBase using ArrayBuilder::Advance; /// Scalar append - Status Append(int64_t val) { + Status Append(const int64_t val) { RETURN_NOT_OK(Reserve(1)); BitUtil::SetBit(null_bitmap_data_, length_); @@ -511,7 +511,7 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder { std::shared_ptr data() const { return data_; } /// Scalar append - Status Append(bool val) { + Status Append(const bool val) { RETURN_NOT_OK(Reserve(1)); BitUtil::SetBit(null_bitmap_data_, length_); if (val) { @@ -523,7 +523,7 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder { return Status::OK(); } - Status Append(uint8_t val) { return Append(val != 0); } + Status Append(const uint8_t val) { return Append(val != 0); } /// Vector append /// diff --git a/cpp/src/arrow/python/arrow_to_python.cc b/cpp/src/arrow/python/arrow_to_python.cc index 4b218af648fb9..622ef8299374a 100644 --- a/cpp/src/arrow/python/arrow_to_python.cc +++ b/cpp/src/arrow/python/arrow_to_python.cc @@ -189,9 +189,8 @@ Status DeserializeTuple(std::shared_ptr array, int64_t start_idx, int64_t DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SET_ITEM) } -Status ReadSerializedPythonSequence(std::shared_ptr src, - std::shared_ptr* batch_out, - std::vector>* tensors_out) { +Status ReadSerializedObject(std::shared_ptr src, + SerializedPyObject* out) { std::shared_ptr reader; int64_t offset; int64_t bytes_read; @@ -200,23 +199,22 @@ Status ReadSerializedPythonSequence(std::shared_ptr src, RETURN_NOT_OK( src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast(&num_tensors))); RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(src, &reader)); - RETURN_NOT_OK(reader->ReadNextRecordBatch(batch_out)); + RETURN_NOT_OK(reader->ReadNextRecordBatch(&out->batch)); RETURN_NOT_OK(src->Tell(&offset)); offset += 4; // Skip the end-of-stream message for (int i = 0; i < num_tensors; ++i) { std::shared_ptr tensor; RETURN_NOT_OK(ipc::ReadTensor(offset, src.get(), &tensor)); - tensors_out->push_back(tensor); + out->tensors.push_back(tensor); RETURN_NOT_OK(src->Tell(&offset)); } return Status::OK(); } -Status DeserializePythonSequence(std::shared_ptr batch, - std::vector> tensors, - PyObject* base, PyObject** out) { +Status DeserializeObject(const SerializedPyObject& obj, PyObject* base, PyObject** out) { PyAcquireGIL lock; - return DeserializeList(batch->column(0), 0, batch->num_rows(), base, tensors, out); + return DeserializeList(obj.batch->column(0), 0, obj.batch->num_rows(), base, + obj.tensors, out); } } // namespace py diff --git a/cpp/src/arrow/python/arrow_to_python.h b/cpp/src/arrow/python/arrow_to_python.h index f1356e1162bb3..559ce18c50709 100644 --- a/cpp/src/arrow/python/arrow_to_python.h +++ b/cpp/src/arrow/python/arrow_to_python.h @@ -24,7 +24,9 @@ #include #include +#include "arrow/python/python_to_arrow.h" #include "arrow/status.h" +#include "arrow/util/visibility.h" namespace arrow { @@ -39,14 +41,24 @@ class RandomAccessFile; namespace py { -Status ReadSerializedPythonSequence(std::shared_ptr src, - std::shared_ptr* batch_out, - std::vector>* tensors_out); +/// \brief Read serialized Python sequence from file interface using Arrow IPC +/// \param[in] src a RandomAccessFile +/// \param[out] out the reconstructed data +/// \return Status +ARROW_EXPORT +Status ReadSerializedObject(std::shared_ptr src, + SerializedPyObject* out); -// This acquires the GIL -Status DeserializePythonSequence(std::shared_ptr batch, - std::vector> tensors, - PyObject* base, PyObject** out); +/// \brief Reconstruct Python object from Arrow-serialized representation +/// \param[in] object +/// \param[in] base a Python object holding the underlying data that any NumPy +/// arrays will reference, to avoid premature deallocation +/// \param[out] out the returned object +/// \return Status +/// This acquires the GIL +ARROW_EXPORT +Status DeserializeObject(const SerializedPyObject& object, PyObject* base, + PyObject** out); } // namespace py } // namespace arrow diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index d6a9213a7953f..47d8ef60c4b48 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -35,6 +36,7 @@ #include "arrow/python/helpers.h" #include "arrow/python/numpy_convert.h" #include "arrow/python/platform.h" +#include "arrow/tensor.h" #include "arrow/util/logging.h" constexpr int32_t kMaxRecursionDepth = 100; @@ -47,44 +49,6 @@ PyObject* pyarrow_deserialize_callback = NULL; namespace arrow { namespace py { -#define UPDATE(OFFSET, TAG) \ - if (TAG == -1) { \ - TAG = num_tags; \ - num_tags += 1; \ - } \ - RETURN_NOT_OK(offsets_.Append(static_cast(OFFSET))); \ - RETURN_NOT_OK(types_.Append(TAG)); \ - RETURN_NOT_OK(nones_.AppendToBitmap(true)); - -#define ADD_ELEMENT(VARNAME, TAG) \ - if (TAG != -1) { \ - types[TAG] = std::make_shared("", VARNAME.type()); \ - RETURN_NOT_OK(VARNAME.Finish(&children[TAG])); \ - RETURN_NOT_OK(nones_.AppendToBitmap(true)); \ - type_ids.push_back(TAG); \ - } - -#define ADD_SUBSEQUENCE(DATA, OFFSETS, BUILDER, TAG, NAME) \ - if (DATA) { \ - DCHECK(DATA->length() == OFFSETS.back()); \ - std::shared_ptr offset_array; \ - Int32Builder builder(pool_, std::make_shared()); \ - RETURN_NOT_OK(builder.Append(OFFSETS.data(), OFFSETS.size())); \ - RETURN_NOT_OK(builder.Finish(&offset_array)); \ - std::shared_ptr list_array; \ - RETURN_NOT_OK(ListArray::FromArrays(*offset_array, *DATA, pool_, &list_array)); \ - auto field = std::make_shared(NAME, list_array->type()); \ - auto type = \ - std::make_shared(std::vector>({field})); \ - types[TAG] = std::make_shared("", type); \ - children[TAG] = std::shared_ptr( \ - new StructArray(type, list_array->length(), {list_array})); \ - RETURN_NOT_OK(nones_.AppendToBitmap(true)); \ - type_ids.push_back(TAG); \ - } else { \ - DCHECK_EQ(OFFSETS.size(), 1); \ - } - /// A Sequence is a heterogeneous collections of elements. It can contain /// scalar Python types, lists, tuples, dictionaries and tensors. class SequenceBuilder { @@ -112,53 +76,64 @@ class SequenceBuilder { return nones_.AppendToBitmap(false); } + Status Update(int64_t offset, int8_t* tag) { + if (*tag == -1) { + *tag = num_tags_++; + } + RETURN_NOT_OK(offsets_.Append(static_cast(offset))); + RETURN_NOT_OK(types_.Append(*tag)); + return nones_.AppendToBitmap(true); + } + + template + Status AppendPrimitive(const T val, int8_t* tag, BuilderType* out) { + RETURN_NOT_OK(Update(out->length(), tag)); + return out->Append(val); + } + /// Appending a boolean to the sequence - Status AppendBool(bool data) { - UPDATE(bools_.length(), bool_tag); - return bools_.Append(data); + Status AppendBool(const bool data) { + return AppendPrimitive(data, &bool_tag_, &bools_); } /// Appending an int64_t to the sequence - Status AppendInt64(int64_t data) { - UPDATE(ints_.length(), int_tag); - return ints_.Append(data); + Status AppendInt64(const int64_t data) { + return AppendPrimitive(data, &int_tag_, &ints_); } /// Appending an uint64_t to the sequence - Status AppendUInt64(uint64_t data) { - UPDATE(ints_.length(), int_tag); - return ints_.Append(data); + Status AppendUInt64(const uint64_t data) { + // TODO(wesm): Bounds check + return AppendPrimitive(static_cast(data), &int_tag_, &ints_); } /// Append a list of bytes to the sequence Status AppendBytes(const uint8_t* data, int32_t length) { - UPDATE(bytes_.length(), bytes_tag); + RETURN_NOT_OK(Update(bytes_.length(), &bytes_tag_)); return bytes_.Append(data, length); } /// Appending a string to the sequence Status AppendString(const char* data, int32_t length) { - UPDATE(strings_.length(), string_tag); + RETURN_NOT_OK(Update(strings_.length(), &string_tag_)); return strings_.Append(data, length); } /// Appending a float to the sequence - Status AppendFloat(float data) { - UPDATE(floats_.length(), float_tag); - return floats_.Append(data); + Status AppendFloat(const float data) { + return AppendPrimitive(data, &float_tag_, &floats_); } /// Appending a double to the sequence - Status AppendDouble(double data) { - UPDATE(doubles_.length(), double_tag); - return doubles_.Append(data); + Status AppendDouble(const double data) { + return AppendPrimitive(data, &double_tag_, &doubles_); } /// Appending a tensor to the sequence /// /// \param tensor_index Index of the tensor in the object. - Status AppendTensor(int32_t tensor_index) { - UPDATE(tensor_indices_.length(), tensor_tag); + Status AppendTensor(const int32_t tensor_index) { + RETURN_NOT_OK(Update(tensor_indices_.length(), &tensor_tag_)); return tensor_indices_.Append(tensor_index); } @@ -176,45 +151,78 @@ class SequenceBuilder { /// \param size /// The size of the sublist Status AppendList(Py_ssize_t size) { - UPDATE(list_offsets_.size() - 1, list_tag); + RETURN_NOT_OK(Update(list_offsets_.size() - 1, &list_tag_)); list_offsets_.push_back(list_offsets_.back() + static_cast(size)); return Status::OK(); } Status AppendTuple(Py_ssize_t size) { - UPDATE(tuple_offsets_.size() - 1, tuple_tag); + RETURN_NOT_OK(Update(tuple_offsets_.size() - 1, &tuple_tag_)); tuple_offsets_.push_back(tuple_offsets_.back() + static_cast(size)); return Status::OK(); } Status AppendDict(Py_ssize_t size) { - UPDATE(dict_offsets_.size() - 1, dict_tag); + RETURN_NOT_OK(Update(dict_offsets_.size() - 1, &dict_tag_)); dict_offsets_.push_back(dict_offsets_.back() + static_cast(size)); return Status::OK(); } + template + Status AddElement(const int8_t tag, BuilderType* out) { + if (tag != -1) { + fields_[tag] = ::arrow::field("", out->type()); + RETURN_NOT_OK(out->Finish(&children_[tag])); + RETURN_NOT_OK(nones_.AppendToBitmap(true)); + type_ids_.push_back(tag); + } + return Status::OK(); + } + + Status AddSubsequence(int8_t tag, const Array* data, + const std::vector& offsets, const std::string& name) { + if (data != nullptr) { + DCHECK(data->length() == offsets.back()); + std::shared_ptr offset_array; + Int32Builder builder(pool_, std::make_shared()); + RETURN_NOT_OK(builder.Append(offsets.data(), offsets.size())); + RETURN_NOT_OK(builder.Finish(&offset_array)); + std::shared_ptr list_array; + RETURN_NOT_OK(ListArray::FromArrays(*offset_array, *data, pool_, &list_array)); + auto field = ::arrow::field(name, list_array->type()); + auto type = ::arrow::struct_({field}); + fields_[tag] = ::arrow::field("", type); + children_[tag] = std::shared_ptr( + new StructArray(type, list_array->length(), {list_array})); + RETURN_NOT_OK(nones_.AppendToBitmap(true)); + type_ids_.push_back(tag); + } else { + DCHECK_EQ(offsets.size(), 1); + } + return Status::OK(); + } + /// Finish building the sequence and return the result. - Status Finish(std::shared_ptr list_data, std::shared_ptr tuple_data, - std::shared_ptr dict_data, std::shared_ptr* out) { - std::vector> types(num_tags); - std::vector> children(num_tags); - std::vector type_ids; - - ADD_ELEMENT(bools_, bool_tag); - ADD_ELEMENT(ints_, int_tag); - ADD_ELEMENT(strings_, string_tag); - ADD_ELEMENT(bytes_, bytes_tag); - ADD_ELEMENT(floats_, float_tag); - ADD_ELEMENT(doubles_, double_tag); - - ADD_ELEMENT(tensor_indices_, tensor_tag); - - ADD_SUBSEQUENCE(list_data, list_offsets_, list_builder, list_tag, "list"); - ADD_SUBSEQUENCE(tuple_data, tuple_offsets_, tuple_builder, tuple_tag, "tuple"); - ADD_SUBSEQUENCE(dict_data, dict_offsets_, dict_builder, dict_tag, "dict"); - - auto type = ::arrow::union_(types, type_ids, UnionMode::DENSE); - out->reset(new UnionArray(type, types_.length(), children, types_.data(), + /// Input arrays may be nullptr + Status Finish(const Array* list_data, const Array* tuple_data, const Array* dict_data, + std::shared_ptr* out) { + fields_.resize(num_tags_); + children_.resize(num_tags_); + + RETURN_NOT_OK(AddElement(bool_tag_, &bools_)); + RETURN_NOT_OK(AddElement(int_tag_, &ints_)); + RETURN_NOT_OK(AddElement(string_tag_, &strings_)); + RETURN_NOT_OK(AddElement(bytes_tag_, &bytes_)); + RETURN_NOT_OK(AddElement(float_tag_, &floats_)); + RETURN_NOT_OK(AddElement(double_tag_, &doubles_)); + RETURN_NOT_OK(AddElement(tensor_tag_, &tensor_indices_)); + + RETURN_NOT_OK(AddSubsequence(list_tag_, list_data, list_offsets_, "list")); + RETURN_NOT_OK(AddSubsequence(tuple_tag_, tuple_data, tuple_offsets_, "tuple")); + RETURN_NOT_OK(AddSubsequence(dict_tag_, dict_data, dict_offsets_, "dict")); + + auto type = ::arrow::union_(fields_, type_ids_, UnionMode::DENSE); + out->reset(new UnionArray(type, types_.length(), children_, types_.data(), offsets_.data(), nones_.null_bitmap(), nones_.null_count())); return Status::OK(); @@ -249,19 +257,24 @@ class SequenceBuilder { // SequenceBuilder::Finish. If a member with one of the tags is added, // the associated variable gets a unique index starting from 0. This // happens in the UPDATE macro in sequence.cc. - int8_t bool_tag = -1; - int8_t int_tag = -1; - int8_t string_tag = -1; - int8_t bytes_tag = -1; - int8_t float_tag = -1; - int8_t double_tag = -1; - - int8_t tensor_tag = -1; - int8_t list_tag = -1; - int8_t tuple_tag = -1; - int8_t dict_tag = -1; - - int8_t num_tags = 0; + int8_t bool_tag_ = -1; + int8_t int_tag_ = -1; + int8_t string_tag_ = -1; + int8_t bytes_tag_ = -1; + int8_t float_tag_ = -1; + int8_t double_tag_ = -1; + + int8_t tensor_tag_ = -1; + int8_t list_tag_ = -1; + int8_t tuple_tag_ = -1; + int8_t dict_tag_ = -1; + + int8_t num_tags_ = 0; + + // Members for the output union constructed in Finish + std::vector> fields_; + std::vector> children_; + std::vector type_ids_; }; /// Constructing dictionaries of key/value pairs. Sequences of @@ -287,11 +300,9 @@ class DictBuilder { /// \param dict_data /// List containing the data from nested dictionaries in the /// value list of the dictionary - Status Finish(std::shared_ptr key_tuple_data, - std::shared_ptr key_dict_data, - std::shared_ptr val_list_data, - std::shared_ptr val_tuple_data, - std::shared_ptr val_dict_data, std::shared_ptr* out) { + Status Finish(const Array* key_tuple_data, const Array* key_dict_data, + const Array* val_list_data, const Array* val_tuple_data, + const Array* val_dict_data, std::shared_ptr* out) { // lists and dicts can't be keys of dicts in Python, that is why for // the keys we do not need to collect sublists std::shared_ptr keys, vals; @@ -530,7 +541,7 @@ Status SerializeSequences(std::vector sequences, int32_t recursion_de if (subdicts.size() > 0) { RETURN_NOT_OK(SerializeDict(subdicts, recursion_depth + 1, &dict, tensors_out)); } - return builder.Finish(list, tuple, dict, out); + return builder.Finish(list.get(), tuple.get(), dict.get(), out); } Status SerializeDict(std::vector dicts, int32_t recursion_depth, @@ -578,8 +589,9 @@ Status SerializeDict(std::vector dicts, int32_t recursion_depth, RETURN_NOT_OK( SerializeDict(val_dicts, recursion_depth + 1, &val_dict_arr, tensors_out)); } - RETURN_NOT_OK(result.Finish(key_tuples_arr, key_dicts_arr, val_list_arr, val_tuples_arr, - val_dict_arr, out)); + RETURN_NOT_OK(result.Finish(key_tuples_arr.get(), key_dicts_arr.get(), + val_list_arr.get(), val_tuples_arr.get(), + val_dict_arr.get(), out)); // This block is used to decrement the reference counts of the results // returned by the serialization callback, which is called in SerializeArray, @@ -605,37 +617,33 @@ std::shared_ptr MakeBatch(std::shared_ptr data) { return std::shared_ptr(new RecordBatch(schema, data->length(), {data})); } -Status SerializePythonSequence(PyObject* sequence, - std::shared_ptr* batch_out, - std::vector>* tensors_out) { +Status SerializeObject(PyObject* sequence, SerializedPyObject* out) { PyAcquireGIL lock; std::vector sequences = {sequence}; std::shared_ptr array; - std::vector tensors; - RETURN_NOT_OK(SerializeSequences(sequences, 0, &array, &tensors)); - *batch_out = MakeBatch(array); - for (const auto& tensor : tensors) { - std::shared_ptr out; - RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(), tensor, &out)); - tensors_out->push_back(out); + std::vector py_tensors; + RETURN_NOT_OK(SerializeSequences(sequences, 0, &array, &py_tensors)); + out->batch = MakeBatch(array); + for (const auto& py_tensor : py_tensors) { + std::shared_ptr arrow_tensor; + RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(), py_tensor, &arrow_tensor)); + out->tensors.push_back(arrow_tensor); } return Status::OK(); } -Status WriteSerializedPythonSequence(std::shared_ptr batch, - std::vector> tensors, - io::OutputStream* dst) { - int32_t num_tensors = static_cast(tensors.size()); +Status WriteSerializedObject(const SerializedPyObject& obj, io::OutputStream* dst) { + int32_t num_tensors = static_cast(obj.tensors.size()); std::shared_ptr writer; int32_t metadata_length; int64_t body_length; RETURN_NOT_OK(dst->Write(reinterpret_cast(&num_tensors), sizeof(int32_t))); - RETURN_NOT_OK(ipc::RecordBatchStreamWriter::Open(dst, batch->schema(), &writer)); - RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); + RETURN_NOT_OK(ipc::RecordBatchStreamWriter::Open(dst, obj.batch->schema(), &writer)); + RETURN_NOT_OK(writer->WriteRecordBatch(*obj.batch)); RETURN_NOT_OK(writer->Close()); - for (const auto& tensor : tensors) { + for (const auto& tensor : obj.tensors) { RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length)); } diff --git a/cpp/src/arrow/python/python_to_arrow.h b/cpp/src/arrow/python/python_to_arrow.h index 2f9170846742a..f07de56538e35 100644 --- a/cpp/src/arrow/python/python_to_arrow.h +++ b/cpp/src/arrow/python/python_to_arrow.h @@ -24,6 +24,7 @@ #include #include "arrow/status.h" +#include "arrow/util/visibility.h" namespace arrow { @@ -38,17 +39,31 @@ class OutputStream; namespace py { +ARROW_EXPORT void set_serialization_callbacks(PyObject* serialize_callback, PyObject* deserialize_callback); -// This acquires the GIL -Status SerializePythonSequence(PyObject* sequence, - std::shared_ptr* batch_out, - std::vector>* tensors_out); +struct ARROW_EXPORT SerializedPyObject { + std::shared_ptr batch; + std::vector> tensors; +}; -Status WriteSerializedPythonSequence(std::shared_ptr batch, - std::vector> tensors, - io::OutputStream* dst); +/// \brief Serialize Python sequence as a RecordBatch plus +/// \param[in] sequence a Python sequence object to serialize to Arrow data +/// structures +/// \param[out] out the serialized representation +/// \return Status +/// +/// Release GIL before calling +ARROW_EXPORT +Status SerializeObject(PyObject* sequence, SerializedPyObject* out); + +/// \brief Write serialized Python object to OutputStream +/// \param[in] object a serialized Python object to write out +/// \param[out] dst an OutputStream +/// \return Status +ARROW_EXPORT +Status WriteSerializedObject(const SerializedPyObject& object, io::OutputStream* dst); } // namespace py } // namespace arrow diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index e6646562ccf15..ba5e938b6a788 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -776,29 +776,23 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil: cdef extern from "arrow/python/api.h" namespace 'arrow::py' nogil: - CStatus SerializePythonSequence( - PyObject* sequence, - shared_ptr[CRecordBatch]* batch_out, - vector[shared_ptr[CTensor]]* tensors_out) - - CStatus DeserializePythonSequence( - shared_ptr[CRecordBatch] batch, - vector[shared_ptr[CTensor]] tensors, - PyObject* base, - PyObject** out) - - cdef CStatus WriteSerializedPythonSequence( - shared_ptr[CRecordBatch] batch, - vector[shared_ptr[CTensor]] tensors, - OutputStream* dst) - - cdef CStatus ReadSerializedPythonSequence( - shared_ptr[RandomAccessFile] src, - shared_ptr[CRecordBatch]* batch_out, - vector[shared_ptr[CTensor]]* tensors_out) - - void set_serialization_callbacks(PyObject* serialize_callback, - PyObject* deserialize_callback); + cdef cppclass SerializedPyObject: + shared_ptr[CRecordBatch] batch + vector[shared_ptr[CTensor]] tensors + + CStatus SerializeObject(object sequence, SerializedPyObject* out) + + CStatus WriteSerializedObject(const SerializedPyObject& obj, + OutputStream* dst) + + CStatus DeserializeObject(const SerializedPyObject& obj, + PyObject* base, PyObject** out) + + CStatus ReadSerializedObject(shared_ptr[RandomAccessFile] src, + SerializedPyObject* out) + + void set_serialization_callbacks(object serialize_callback, + object deserialize_callback) cdef extern from 'arrow/python/init.h': diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi index d979934ebf2de..bc33da1554bba 100644 --- a/python/pyarrow/serialization.pxi +++ b/python/pyarrow/serialization.pxi @@ -19,6 +19,7 @@ from cpython.ref cimport PyObject from pyarrow.compat import pickle + def is_named_tuple(cls): """Return True if cls is a namedtuple and False otherwise.""" b = cls.__bases__ @@ -49,6 +50,7 @@ types_to_pickle = set() custom_serializers = dict() custom_deserializers = dict() + def register_type(type, type_id, pickle=False, custom_serializer=None, custom_deserializer=None): """Add type to the list of types we can serialize. @@ -77,6 +79,7 @@ def register_type(type, type_id, pickle=False, custom_serializers[type_id] = custom_serializer custom_deserializers[type_id] = custom_deserializer + def _serialization_callback(obj): if type(obj) not in type_to_type_id: raise SerializationException("pyarrow does not know how to " @@ -99,6 +102,7 @@ def _serialization_callback(obj): "the object '{}'".format(obj), obj) return dict(serialized_obj, **{"_pytype_": type_id}) + def _deserialization_callback(serialized_obj): type_id = serialized_obj["_pytype_"] @@ -122,8 +126,10 @@ def _deserialization_callback(serialized_obj): obj.__dict__.update(serialized_obj) return obj -set_serialization_callbacks( _serialization_callback, - _deserialization_callback) + +set_serialization_callbacks(_serialization_callback, + _deserialization_callback) + def serialize_sequence(object value, NativeFile sink): """Serialize a Python sequence to a file. @@ -138,12 +144,12 @@ def serialize_sequence(object value, NativeFile sink): cdef shared_ptr[OutputStream] stream sink.write_handle(&stream) - cdef shared_ptr[CRecordBatch] batch - cdef vector[shared_ptr[CTensor]] tensors + cdef SerializedPyObject serialized with nogil: - check_status(SerializePythonSequence( value, &batch, &tensors)) - check_status(WriteSerializedPythonSequence(batch, tensors, stream.get())) + check_status(SerializeObject(value, &serialized)) + check_status(WriteSerializedObject(serialized, stream.get())) + def deserialize_sequence(NativeFile source, object base): """Deserialize a Python sequence from a file. @@ -164,12 +170,12 @@ def deserialize_sequence(NativeFile source, object base): cdef shared_ptr[RandomAccessFile] stream source.read_handle(&stream) - cdef shared_ptr[CRecordBatch] batch - cdef vector[shared_ptr[CTensor]] tensors + cdef SerializedPyObject serialized cdef PyObject* result with nogil: - check_status(ReadSerializedPythonSequence(stream, &batch, &tensors)) - check_status(DeserializePythonSequence(batch, tensors, base, &result)) + check_status(ReadSerializedObject(stream, &serialized)) + check_status(DeserializeObject(serialized, base, &result)) - return result + # This is necessary to avoid a memory leak + return PyObject_to_object(result) diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py index bca9449a3d7bd..731e8d5250a79 100644 --- a/python/pyarrow/tests/test_serialization.py +++ b/python/pyarrow/tests/test_serialization.py @@ -19,7 +19,7 @@ from __future__ import division from __future__ import print_function -from collections import defaultdict, namedtuple +from collections import namedtuple import os import string import sys @@ -85,17 +85,20 @@ def assert_equal(obj1, obj2): def array_custom_serializer(obj): return obj.tolist(), obj.dtype.str + def array_custom_deserializer(serialized_obj): return np.array(serialized_obj[0], dtype=np.dtype(serialized_obj[1])) pa.lib.register_type(np.ndarray, 20 * b"\x00", pickle=False, - custom_serializer=array_custom_serializer, - custom_deserializer=array_custom_deserializer) + custom_serializer=array_custom_serializer, + custom_deserializer=array_custom_deserializer) if sys.version_info >= (3, 0): long_extras = [0, np.array([["hi", u"hi"], [1.3, 1]])] else: - long_extras = [long(0), np.array([["hi", u"hi"], [1.3, long(1)]])] # noqa: E501,F821 + _LONG_ZERO, _LONG_ONE = long(0), long(1) # noqa: E501,F821 + long_extras = [_LONG_ZERO, np.array([["hi", u"hi"], + [1.3, _LONG_ONE]])] PRIMITIVE_OBJECTS = [ 0, 0.0, 0.9, 1 << 62, 1 << 100, 1 << 999, @@ -115,6 +118,7 @@ def array_custom_deserializer(serialized_obj): ((((((((((),),),),),),),),),), {"a": {"b": {"c": {"d": {}}}}}] + class Foo(object): def __init__(self, value=0): self.value = value @@ -178,12 +182,18 @@ class CustomError(Exception): # arbitrary precision integers. This is only called on long integers, # see the associated case in the append method in python_to_arrow.cc pa.lib.register_type(int, 20 * b"\x10", pickle=False, - custom_serializer=lambda obj: str(obj), - custom_deserializer=lambda serialized_obj: int(serialized_obj)) + custom_serializer=lambda obj: str(obj), + custom_deserializer=( + lambda serialized_obj: int(serialized_obj))) + + if (sys.version_info < (3, 0)): - pa.lib.register_type(long, 20 * b"\x11", pickle=False, - custom_serializer=lambda obj: str(obj), - custom_deserializer=lambda serialized_obj: long(serialized_obj)) + deserializer = ( + lambda serialized_obj: long(serialized_obj)) # noqa: E501,F821 + pa.lib.register_type(long, 20 * b"\x11", pickle=False, # noqa: E501,F821 + custom_serializer=lambda obj: str(obj), + custom_deserializer=deserializer) + def serialization_roundtrip(value, f): f.seek(0) @@ -193,7 +203,7 @@ def serialization_roundtrip(value, f): assert_equal(value, result) # Create a large memory mapped file -SIZE = 100 * 1024 * 1024 # 100 MB +SIZE = 100 * 1024 * 1024 # 100 MB arr = np.random.randint(0, 256, size=SIZE).astype('u1') data = arr.tobytes()[:SIZE] path = os.path.join("/tmp/pyarrow-temp-file") @@ -202,14 +212,17 @@ def serialization_roundtrip(value, f): MEMORY_MAPPED_FILE = pa.memory_map(path, mode="r+") + def test_primitive_serialization(): for obj in PRIMITIVE_OBJECTS: serialization_roundtrip([obj], MEMORY_MAPPED_FILE) + def test_complex_serialization(): for obj in COMPLEX_OBJECTS: serialization_roundtrip([obj], MEMORY_MAPPED_FILE) + def test_custom_serialization(): for obj in CUSTOM_OBJECTS: serialization_roundtrip([obj], MEMORY_MAPPED_FILE)