diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index da79a7d4baa43..4da94821f04f0 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -3199,6 +3199,41 @@ TEST(TestArrowReaderAdHoc, HandleDictPageOffsetZero) { TryReadDataFile(test::get_data_file("dict-page-offset-zero.parquet")); } +TEST(TestArrowReaderAdHoc, WriteBatchedNestedNullableStringColumn) { + // ARROW-10493 + std::vector> fields{ + ::arrow::field("s", ::arrow::utf8(), /*nullable=*/true), + ::arrow::field("d", ::arrow::decimal128(4, 2), /*nullable=*/true), + ::arrow::field("b", ::arrow::boolean(), /*nullable=*/true), + ::arrow::field("i8", ::arrow::int8(), /*nullable=*/true), + ::arrow::field("i64", ::arrow::int64(), /*nullable=*/true)}; + auto type = ::arrow::struct_(fields); + auto outer_array = ::arrow::ArrayFromJSON( + type, + R"([{"s": "abc", "d": "1.23", "b": true, "i8": 10, "i64": 11 }, + {"s": "de", "d": "3.45", "b": true, "i8": 12, "i64": 13 }, + {"s": "fghi", "d": "6.78", "b": false, "i8": 14, "i64": 15 }, + {}, + {"s": "jklmo", "d": "9.10", "b": true, "i8": 16, "i64": 17 }, + null, + {"s": "p", "d": "11.12", "b": false, "i8": 18, "i64": 19 }, + {"s": "qrst", "d": "13.14", "b": false, "i8": 20, "i64": 21 }, + {}, + {"s": "uvw", "d": "15.16", "b": true, "i8": 22, "i64": 23 }, + {"s": "x", "d": "17.18", "b": false, "i8": 24, "i64": 25 }, + {}, + null])"); + + auto expected = Table::Make( + ::arrow::schema({::arrow::field("outer", type, /*nullable=*/true)}), {outer_array}); + + auto write_props = WriterProperties::Builder().write_batch_size(4)->build(); + + std::shared_ptr actual; + DoRoundtrip(expected, /*row_group_size=*/outer_array->length(), &actual, write_props); + ::arrow::AssertTablesEqual(*expected, *actual, /*same_chunk_layout=*/false); +} + class TestArrowReaderAdHocSparkAndHvr : public ::testing::TestWithParam< std::tuple>> {}; diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index ed7058f82eb72..cc7008915f5b9 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -33,10 +33,12 @@ #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit_stream_utils.h" +#include "arrow/util/bitmap_ops.h" #include "arrow/util/checked_cast.h" #include "arrow/util/compression.h" #include "arrow/util/logging.h" #include "arrow/util/rle_encoding.h" +#include "arrow/visitor_inline.h" #include "parquet/column_page.h" #include "parquet/encoding.h" #include "parquet/encryption_internal.h" @@ -53,15 +55,82 @@ using arrow::Array; using arrow::ArrayData; using arrow::Datum; +using arrow::Result; using arrow::Status; using arrow::BitUtil::BitWriter; using arrow::internal::checked_cast; +using arrow::internal::checked_pointer_cast; using arrow::util::RleEncoder; namespace parquet { namespace { +// Visitor that exracts the value buffer from a FlatArray at a given offset. +struct ValueBufferSlicer { + template + ::arrow::enable_if_base_binary Visit(const T& array) { + auto data = array.data(); + buffer_ = + SliceBuffer(data->buffers[1], data->offset * sizeof(typename T::offset_type), + data->length * sizeof(typename T::offset_type)); + return Status::OK(); + } + + template + ::arrow::enable_if_fixed_size_binary Visit( + const T& array) { + auto data = array.data(); + buffer_ = SliceBuffer(data->buffers[1], data->offset * array.byte_width(), + data->length * array.byte_width()); + return Status::OK(); + } + + template + ::arrow::enable_if_t<::arrow::has_c_type::value && + !std::is_same::value, + Status> + Visit(const T& array) { + auto data = array.data(); + buffer_ = SliceBuffer( + data->buffers[1], + ::arrow::TypeTraits::bytes_required(data->offset), + ::arrow::TypeTraits::bytes_required(data->length)); + return Status::OK(); + } + + Status Visit(const ::arrow::BooleanArray& array) { + auto data = array.data(); + if (BitUtil::IsMultipleOf8(data->offset)) { + buffer_ = SliceBuffer(data->buffers[1], BitUtil::BytesForBits(data->offset), + BitUtil::BytesForBits(data->length)); + return Status::OK(); + } + PARQUET_ASSIGN_OR_THROW(buffer_, + ::arrow::internal::CopyBitmap(pool_, data->buffers[1]->data(), + data->offset, data->length)); + return Status::OK(); + } +#define NOT_IMPLEMENTED_VISIT(ArrowTypePrefix) \ + Status Visit(const ::arrow::ArrowTypePrefix##Array& array) { \ + return Status::NotImplemented("Slicing not implemented for " #ArrowTypePrefix); \ + } + + NOT_IMPLEMENTED_VISIT(Null); + NOT_IMPLEMENTED_VISIT(Union); + NOT_IMPLEMENTED_VISIT(List); + NOT_IMPLEMENTED_VISIT(LargeList); + NOT_IMPLEMENTED_VISIT(Struct); + NOT_IMPLEMENTED_VISIT(FixedSizeList); + NOT_IMPLEMENTED_VISIT(Dictionary); + NOT_IMPLEMENTED_VISIT(Extension); + +#undef NOT_IMPLEMENTED_VISIT + + MemoryPool* pool_; + std::shared_ptr buffer_; +}; + internal::LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) { internal::LevelInfo level_info; level_info.def_level = descr->max_definition_level(); @@ -1221,15 +1290,24 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< *null_count = io.null_count; } - std::shared_ptr MaybeReplaceValidity(std::shared_ptr array, - int64_t new_null_count) { + Result> MaybeReplaceValidity(std::shared_ptr array, + int64_t new_null_count, + arrow::MemoryPool* memory_pool) { if (bits_buffer_ == nullptr) { return array; } std::vector> buffers = array->data()->buffers; + if (buffers.empty()) { + return array; + } buffers[0] = bits_buffer_; // Should be a leaf array. - DCHECK_EQ(array->num_fields(), 0); + DCHECK_GT(buffers.size(), 1); + ValueBufferSlicer slicer{memory_pool, /*buffer=*/nullptr}; + if (array->data()->offset > 0) { + RETURN_NOT_OK(::arrow::VisitArrayInline(*array, &slicer)); + buffers[1] = slicer.buffer_; + } return arrow::MakeArray(std::make_shared( array->type(), array->length(), std::move(buffers), new_null_count)); } @@ -1382,7 +1460,9 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( AddIfNotNull(rep_levels, offset)); std::shared_ptr writeable_indices = indices->Slice(value_offset, batch_num_spaced_values); - writeable_indices = MaybeReplaceValidity(writeable_indices, null_count); + PARQUET_ASSIGN_OR_THROW( + writeable_indices, + MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool)); dict_encoder->PutIndices(*writeable_indices); CommitWriteAndCheckPageLimit(batch_size, batch_num_values); value_offset += batch_num_spaced_values; @@ -1809,7 +1889,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( AddIfNotNull(rep_levels, offset)); std::shared_ptr data_slice = array.Slice(value_offset, batch_num_spaced_values); - data_slice = MaybeReplaceValidity(data_slice, null_count); + PARQUET_ASSIGN_OR_THROW( + data_slice, MaybeReplaceValidity(data_slice, null_count, ctx->memory_pool)); current_encoder_->Put(*data_slice); if (page_statistics_ != nullptr) { @@ -1855,8 +1936,6 @@ struct SerializeFunctor< // ---------------------------------------------------------------------- // Write Arrow to Decimal128 -using ::arrow::internal::checked_pointer_cast; - // Requires a custom serializer because decimal128 in parquet are in big-endian // format. Thus, a temporary local buffer is required. template