From 196fbe5f5e0669b1f8bf71005c8a4c6a4e0fb2da Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 27 Mar 2023 13:42:47 -0700 Subject: [PATCH] GH-34639: [C++] Support RecordBatch::FromStructArray even if struct array has nulls/offsets (#34691) ### Rationale for this change A struct array can have a validity map and an offset. A record batch cannot. When converting from a struct array to a record batch we were throwing an error if a validity map was present and returning the wrong data if an offset was present. ### What changes are included in this PR? If a validity map or offset are present then StructArray::Flatten is used to push the offset and validity map into the struct array. Note: this means that RecordBatch::FromStructArray will not be zero-copy if it has to push down a validity map. ### Are these changes tested? Yes ### Are there any user-facing changes? Yes, RecordBatch::FromStructArray now takes in a memory pool because it might have to make allocations when pushing validity bitmaps down. * Closes: #34639 Authored-by: Weston Pace Signed-off-by: Weston Pace --- cpp/src/arrow/json/reader.cc | 5 +++-- cpp/src/arrow/record_batch.cc | 14 ++++++++++---- cpp/src/arrow/record_batch.h | 12 +++++++++--- cpp/src/arrow/record_batch_test.cc | 19 ++++++++++++++++++- 4 files changed, 40 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/json/reader.cc b/cpp/src/arrow/json/reader.cc index dae06d5bf6171..fe3c48b1aee4a 100644 --- a/cpp/src/arrow/json/reader.cc +++ b/cpp/src/arrow/json/reader.cc @@ -317,7 +317,8 @@ class DecodingOperator { std::shared_ptr chunked; RETURN_NOT_OK(builder->Finish(&chunked)); - ARROW_ASSIGN_OR_RAISE(auto batch, RecordBatch::FromStructArray(chunked->chunk(0))); + ARROW_ASSIGN_OR_RAISE( + auto batch, RecordBatch::FromStructArray(chunked->chunk(0), context_->pool())); return DecodedBlock{std::move(batch), num_bytes}; } @@ -552,7 +553,7 @@ Result> ParseOne(ParseOptions options, std::shared_ptr converted_chunked; RETURN_NOT_OK(builder->Finish(&converted_chunked)); - return RecordBatch::FromStructArray(converted_chunked->chunk(0)); + return RecordBatch::FromStructArray(converted_chunked->chunk(0), context.pool()); } } // namespace json diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc index 28245c8f5de49..c24a19e2f43b0 100644 --- a/cpp/src/arrow/record_batch.cc +++ b/cpp/src/arrow/record_batch.cc @@ -198,14 +198,20 @@ Result> RecordBatch::MakeEmpty( } Result> RecordBatch::FromStructArray( - const std::shared_ptr& array) { + const std::shared_ptr& array, MemoryPool* memory_pool) { if (array->type_id() != Type::STRUCT) { return Status::TypeError("Cannot construct record batch from array of type ", *array->type()); } - if (array->null_count() != 0) { - return Status::Invalid( - "Unable to construct record batch from a StructArray with non-zero nulls."); + if (array->null_count() != 0 || array->offset() != 0) { + // If the struct array has a validity map or offset we need to push those into + // the child arrays via Flatten since the RecordBatch doesn't have validity/offset + const std::shared_ptr& struct_array = + internal::checked_pointer_cast(array); + ARROW_ASSIGN_OR_RAISE(std::vector> fields, + struct_array->Flatten(memory_pool)); + return Make(arrow::schema(array->type()->fields()), array->length(), + std::move(fields)); } return Make(arrow::schema(array->type()->fields()), array->length(), array->data()->child_data); diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index 8bc70322560b6..a5121ab397d1e 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -82,10 +82,16 @@ class ARROW_EXPORT RecordBatch { /// \brief Construct record batch from struct array /// /// This constructs a record batch using the child arrays of the given - /// array, which must be a struct array. Note that the struct array's own - /// null bitmap is not reflected in the resulting record batch. + /// array, which must be a struct array. + /// + /// \param[in] array the source array, must be a StructArray + /// \param[in] pool the memory pool to allocate new validity bitmaps + /// + /// This operation will usually be zero-copy. However, if the struct array has an + /// offset or a validity bitmap then these will need to be pushed into the child arrays. + /// Pushing the offset is zero-copy but pushing the validity bitmap is not. static Result> FromStructArray( - const std::shared_ptr& array); + const std::shared_ptr& array, MemoryPool* pool = default_memory_pool()); /// \brief Determine if two record batches are exactly equal /// diff --git a/cpp/src/arrow/record_batch_test.cc b/cpp/src/arrow/record_batch_test.cc index ed2f3e552e77b..9dab74c223ed4 100644 --- a/cpp/src/arrow/record_batch_test.cc +++ b/cpp/src/arrow/record_batch_test.cc @@ -331,6 +331,19 @@ TEST_F(TestRecordBatch, ToFromEmptyStructArray) { ASSERT_TRUE(batch1->Equals(*batch2)); } +TEST_F(TestRecordBatch, FromSlicedStructArray) { + static constexpr int64_t kLength = 10; + std::shared_ptr x_arr = ArrayFromJSON(int64(), "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]"); + StructArray struct_array(struct_({field("x", int64())}), kLength, {x_arr}); + std::shared_ptr sliced = struct_array.Slice(5, 3); + ASSERT_OK_AND_ASSIGN(auto batch, RecordBatch::FromStructArray(sliced)); + + std::shared_ptr expected_arr = ArrayFromJSON(int64(), "[5, 6, 7]"); + std::shared_ptr expected = + RecordBatch::Make(schema({field("x", int64())}), 3, {expected_arr}); + AssertBatchesEqual(*expected, *batch); +} + TEST_F(TestRecordBatch, FromStructArrayInvalidType) { random::RandomArrayGenerator gen(42); ASSERT_RAISES(TypeError, RecordBatch::FromStructArray(gen.ArrayOf(int32(), 6))); @@ -339,7 +352,11 @@ TEST_F(TestRecordBatch, FromStructArrayInvalidType) { TEST_F(TestRecordBatch, FromStructArrayInvalidNullCount) { auto struct_array = ArrayFromJSON(struct_({field("f1", int32())}), R"([{"f1": 1}, null])"); - ASSERT_RAISES(Invalid, RecordBatch::FromStructArray(struct_array)); + ASSERT_OK_AND_ASSIGN(auto batch, RecordBatch::FromStructArray(struct_array)); + std::shared_ptr expected_arr = ArrayFromJSON(int32(), "[1, null]"); + std::shared_ptr expected = + RecordBatch::Make(schema({field("f1", int32())}), 2, {expected_arr}); + AssertBatchesEqual(*expected, *batch); } TEST_F(TestRecordBatch, MakeEmpty) {