Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-10493: [C++][Parquet] Fix offset lost in MaybeReplaceValidity #8589

Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions cpp/src/arrow/array/array_base.h
Expand Up @@ -207,6 +207,14 @@ static inline std::ostream& operator<<(std::ostream& os, const Array& x) {

/// Base class for non-nested arrays
class ARROW_EXPORT FlatArray : public Array {
public:
/// Returns the slice buffer for the second buffer in the
/// array (values buffer for primitives, offsets buffers
/// for variable length binary types.
///
/// \note API EXPERIMENTAL
virtual Result<std::shared_ptr<Buffer>> SlicedValues(MemoryPool* pool) const = 0;
bkietz marked this conversation as resolved.
Show resolved Hide resolved

protected:
using Array::Array;
};
Expand Down Expand Up @@ -243,6 +251,10 @@ class ARROW_EXPORT NullArray : public FlatArray {
explicit NullArray(const std::shared_ptr<ArrayData>& data) { SetData(data); }
explicit NullArray(int64_t length);

Result<std::shared_ptr<Buffer>> SlicedValues(MemoryPool* pool) const override {
return NULLPTR;
}

private:
void SetData(const std::shared_ptr<ArrayData>& data) {
null_bitmap_data_ = NULLPTR;
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/array/array_binary.h
Expand Up @@ -69,6 +69,11 @@ class BaseBinaryArray : public FlatArray {
raw_value_offsets_[i + 1] - pos);
}

Result<std::shared_ptr<Buffer>> SlicedValues(MemoryPool* pool) const override {
return SliceBuffer(data_->buffers[1], data_->offset * sizeof(offset_type),
data_->length * sizeof(offset_type));
}

/// \brief Get binary value as a std::string
///
/// \param i the value index
Expand Down Expand Up @@ -224,6 +229,11 @@ class ARROW_EXPORT FixedSizeBinaryArray : public PrimitiveArray {

const uint8_t* raw_values() const { return raw_values_ + data_->offset * byte_width_; }

Result<std::shared_ptr<Buffer>> SlicedValues(MemoryPool* pool) const override {
return SliceBuffer(data_->buffers[1], data_->offset * byte_width_,
data_->length * byte_width_);
}

protected:
void SetData(const std::shared_ptr<ArrayData>& data) {
this->PrimitiveArray::SetData(data);
Expand Down
22 changes: 22 additions & 0 deletions cpp/src/arrow/array/array_binary_test.cc
Expand Up @@ -247,6 +247,26 @@ class TestStringArray : public ::testing::Test {
ASSERT_EQ(arr->GetString(0), "b");
}

void TestSlicedValues() {
BuilderType builder;

ASSERT_OK(builder.Append("abc"));
ASSERT_OK(builder.Append("efghi"));
ASSERT_OK(builder.Append("jklmno"));
ASSERT_OK(builder.Append("jkl1"));
ASSERT_OK(builder.Append("xyz"));

std::shared_ptr<Array> array;
ASSERT_OK(builder.Finish(&array));
auto sliced = array->Slice(2, 2);
std::shared_ptr<ArrayData> array_data = sliced->data()->Copy();
ASSERT_OK_AND_ASSIGN(array_data->buffers[1],
internal::checked_pointer_cast<FlatArray>(sliced)->SlicedValues(
default_memory_pool()));
array_data->offset = 0;
ASSERT_ARRAYS_EQUAL(*::arrow::MakeArray(array_data), *sliced);
}

Status ValidateFull(int64_t length, std::vector<offset_type> offsets,
util::string_view data, int64_t offset = 0) {
ArrayType arr(length, Buffer::Wrap(offsets), std::make_shared<Buffer>(data),
Expand Down Expand Up @@ -351,6 +371,8 @@ TYPED_TEST(TestStringArray, CompareNullByteSlots) { this->TestCompareNullByteSlo

TYPED_TEST(TestStringArray, TestSliceGetString) { this->TestSliceGetString(); }

TYPED_TEST(TestStringArray, TestSlicedValues) { this->TestSlicedValues(); }

TYPED_TEST(TestStringArray, TestValidateOffsets) { this->TestValidateOffsets(); }

TYPED_TEST(TestStringArray, TestValidateData) { this->TestValidateData(); }
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/array/array_primitive.cc
Expand Up @@ -76,6 +76,15 @@ int64_t BooleanArray::true_count() const {
}
}

Result<std::shared_ptr<Buffer>> BooleanArray::SlicedValues(MemoryPool* pool) const {
if (BitUtil::IsMultipleOf8(data_->offset)) {
return SliceBuffer(data_->buffers[1], BitUtil::BytesForBits(data_->offset),
BitUtil::BytesForBits(data_->length));
}
return internal::CopyBitmap(pool, data_->buffers[1]->data(), data_->offset,
data_->length);
}

// ----------------------------------------------------------------------
// Day time interval

Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/array/array_primitive.h
Expand Up @@ -57,6 +57,11 @@ class NumericArray : public PrimitiveArray {
return reinterpret_cast<const value_type*>(raw_values_) + data_->offset;
}

Result<std::shared_ptr<Buffer>> SlicedValues(MemoryPool* pool) const override {
return SliceBuffer(data_->buffers[1], data_->offset * sizeof(value_type),
data_->length * sizeof(value_type));
}

value_type Value(int64_t i) const { return raw_values()[i]; }

// For API compatibility with BinaryArray etc.
Expand All @@ -82,6 +87,7 @@ class ARROW_EXPORT BooleanArray : public PrimitiveArray {
i + data_->offset);
}

Result<std::shared_ptr<Buffer>> SlicedValues(MemoryPool* pool) const override;
bool GetView(int64_t i) const { return Value(i); }

/// \brief Return the number of false (0) values among the valid
Expand Down Expand Up @@ -113,6 +119,11 @@ class ARROW_EXPORT DayTimeIntervalArray : public PrimitiveArray {
TypeClass::DayMilliseconds GetValue(int64_t i) const;
TypeClass::DayMilliseconds Value(int64_t i) const { return GetValue(i); }

Result<std::shared_ptr<Buffer>> SlicedValues(MemoryPool* pool) const override {
return SliceBuffer(data_->buffers[1], data_->offset * sizeof(TypeClass),
data_->length * sizeof(TypeClass));
}

// For compatibility with Take kernel.
TypeClass::DayMilliseconds GetView(int64_t i) const { return GetValue(i); }

Expand Down
58 changes: 58 additions & 0 deletions cpp/src/arrow/array/array_test.cc
Expand Up @@ -1887,6 +1887,18 @@ TEST_F(TestFWBinaryArray, ArrayDataVisitorSliced) {
ARROW_UNUSED(visitor); // Workaround weird MSVC warning
}

TEST_F(TestFWBinaryArray, ValuesSliced) {
auto type = fixed_size_binary(3);

auto sliced = ArrayFromJSON(type, R"(["abc", "def", "ghi", "jkl"])")->Slice(1, 2);
std::shared_ptr<ArrayData> array_data = sliced->data()->Copy();
ASSERT_OK_AND_ASSIGN(
array_data->buffers[1],
checked_pointer_cast<FlatArray>(sliced)->SlicedValues(default_memory_pool()));
array_data->offset = 0;
ASSERT_ARRAYS_EQUAL(*MakeArray(array_data), *sliced);
}

// ----------------------------------------------------------------------
// AdaptiveInt tests

Expand Down Expand Up @@ -2602,4 +2614,50 @@ TEST(TestRechunkArraysConsistently, Plain) {
}
}

TEST(TestSlicedValues, BooleanEvenOffset) {
auto type = boolean();

auto sliced =
ArrayFromJSON(type,
R"([false, false, true, false, true, true, true, true, false, true])")
->Slice(8, 2);
std::shared_ptr<ArrayData> array_data = sliced->data()->Copy();
ASSERT_OK_AND_ASSIGN(
array_data->buffers[1],
checked_pointer_cast<FlatArray>(sliced)->SlicedValues(default_memory_pool()));
array_data->offset = 0;
ASSERT_ARRAYS_EQUAL(*MakeArray(array_data), *sliced);
}

TEST(TestSlicedValues, BooleanUnevenOffset) {
auto type = boolean();

auto sliced =
ArrayFromJSON(type, R"([false, false, true, false, true, true, true, true])")
->Slice(1, 5);
std::shared_ptr<ArrayData> array_data = sliced->data()->Copy();
ASSERT_OK_AND_ASSIGN(
array_data->buffers[1],
checked_pointer_cast<FlatArray>(sliced)->SlicedValues(default_memory_pool()));
array_data->offset = 0;
ASSERT_ARRAYS_EQUAL(*MakeArray(array_data), *sliced);
}

class NumericSlicedValuesTest
: public ::testing::TestWithParam<std::shared_ptr<DataType>> {};
TEST_P(NumericSlicedValuesTest, SlicedValues) {
auto type = GetParam();

auto sliced = ArrayFromJSON(type, R"([1, 2, 3, 4])")->Slice(1, 2);
std::shared_ptr<ArrayData> array_data = sliced->data()->Copy();
ASSERT_OK_AND_ASSIGN(
array_data->buffers[1],
checked_pointer_cast<FlatArray>(sliced)->SlicedValues(default_memory_pool()));
array_data->offset = 0;
ASSERT_ARRAYS_EQUAL(*MakeArray(array_data), *sliced);
}

INSTANTIATE_TEST_SUITE_P(NumericSlicedValuesTest, NumericSlicedValuesTest,
::testing::Values(int8(), int16(), int32(), int64(), uint32()));

} // namespace arrow
31 changes: 31 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Expand Up @@ -38,6 +38,7 @@
#include "arrow/testing/random.h"
#include "arrow/testing/util.h"
#include "arrow/type_traits.h"
#include "arrow/util/bitmap_builders.h"
#include "arrow/util/decimal.h"
#include "arrow/util/logging.h"
#include "arrow/util/range.h"
Expand Down Expand Up @@ -3199,6 +3200,36 @@ TEST(TestArrowReaderAdHoc, HandleDictPageOffsetZero) {
TryReadDataFile(test::get_data_file("dict-page-offset-zero.parquet"));
}

TEST(TestArrowReaderAdHoc, WriteBatchedNestedNullableStringColumn) {
// ARROW-10493
auto type =
::arrow::struct_({::arrow::field("inner", ::arrow::utf8(), /*nullable=*/true)});
auto outer_array = ::arrow::ArrayFromJSON(type,
R"([{"inner": "a"},
{"inner": null},
{"inner": "b"},
{"inner": "c"},
{"inner": null},
{"inner": null},
{"inner": "d"},
null])");
auto inner_array =
std::static_pointer_cast<::arrow::StructArray>(outer_array)->field(0);
ASSERT_OK_AND_ASSIGN(inner_array->data()->buffers[0],
::arrow::internal::BytesToBits({1, 0, 1, 1, 0, 0, 1, 0}));
ASSERT_OK_AND_ASSIGN(outer_array->data()->buffers[0],
::arrow::internal::BytesToBits({1, 1, 1, 1, 1, 1, 1, 0}));
bkietz marked this conversation as resolved.
Show resolved Hide resolved

auto expected = Table::Make(
::arrow::schema({::arrow::field("outer", type, /*nullable=*/true)}), {outer_array});

auto write_props = WriterProperties::Builder().write_batch_size(2)->build();

std::shared_ptr<Table> actual;
DoRoundtrip(expected, /*row_group_size=*/outer_array->length(), &actual, write_props);
::arrow::AssertTablesEqual(*expected, *actual, /*same_chunk_layout=*/false);
}

class TestArrowReaderAdHocSparkAndHvr
: public ::testing::TestWithParam<
std::tuple<std::string, std::shared_ptr<DataType>>> {};
Expand Down
15 changes: 10 additions & 5 deletions cpp/src/parquet/column_writer.cc
Expand Up @@ -56,6 +56,7 @@ using arrow::Datum;
using arrow::Status;
using arrow::BitUtil::BitWriter;
using arrow::internal::checked_cast;
using arrow::internal::checked_pointer_cast;
using arrow::util::RleEncoder;

namespace parquet {
Expand Down Expand Up @@ -1222,12 +1223,17 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
}

std::shared_ptr<Array> MaybeReplaceValidity(std::shared_ptr<Array> array,
int64_t new_null_count) {
int64_t new_null_count,
arrow::MemoryPool* memory_pool) {
if (bits_buffer_ == nullptr) {
return array;
}
std::vector<std::shared_ptr<Buffer>> buffers = array->data()->buffers;
buffers[0] = bits_buffer_;
DCHECK_GT(buffers.size(), 1);
PARQUET_ASSIGN_OR_THROW(
buffers[1],
checked_pointer_cast<arrow::FlatArray>(array)->SlicedValues(memory_pool));
// Should be a leaf array.
DCHECK_EQ(array->num_fields(), 0);
chrisavl marked this conversation as resolved.
Show resolved Hide resolved
return arrow::MakeArray(std::make_shared<ArrayData>(
Expand Down Expand Up @@ -1382,7 +1388,8 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
AddIfNotNull(rep_levels, offset));
std::shared_ptr<Array> writeable_indices =
indices->Slice(value_offset, batch_num_spaced_values);
writeable_indices = MaybeReplaceValidity(writeable_indices, null_count);
writeable_indices =
MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool);
dict_encoder->PutIndices(*writeable_indices);
CommitWriteAndCheckPageLimit(batch_size, batch_num_values);
value_offset += batch_num_spaced_values;
Expand Down Expand Up @@ -1809,7 +1816,7 @@ Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
AddIfNotNull(rep_levels, offset));
std::shared_ptr<Array> data_slice =
array.Slice(value_offset, batch_num_spaced_values);
data_slice = MaybeReplaceValidity(data_slice, null_count);
data_slice = MaybeReplaceValidity(data_slice, null_count, ctx->memory_pool);

current_encoder_->Put(*data_slice);
if (page_statistics_ != nullptr) {
Expand Down Expand Up @@ -1855,8 +1862,6 @@ struct SerializeFunctor<
// ----------------------------------------------------------------------
// Write Arrow to Decimal128

using ::arrow::internal::checked_pointer_cast;

// Requires a custom serializer because decimal128 in parquet are in big-endian
// format. Thus, a temporary local buffer is required.
template <typename ParquetType, typename ArrowType>
Expand Down