Skip to content

Commit

Permalink
GH-36502: [C++] Add run-end encoded array support to ReferencedByteRa…
Browse files Browse the repository at this point in the history
…nges (#36521)

### Rationale for this change

Fix for #36502.

### What changes are included in this PR?

Fix and C++ tests.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Addition of a new function to the `ree_util` namespace.
* Closes: #36502

Lead-authored-by: Felipe Oliveira Carvalho <felipekde@gmail.com>
Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
felipecrv and bkietz committed Jul 11, 2023
1 parent 0e0df29 commit e4bf235
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 10 deletions.
24 changes: 23 additions & 1 deletion cpp/src/arrow/array/array_run_end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstdint>
#include <cstring>
#include <memory>
#include <utility>
#include <vector>

#include "arrow/array.h"
Expand Down Expand Up @@ -127,34 +128,55 @@ TEST_P(TestRunEndEncodedArray, FromRunEndsAndValues) {
RunEndEncodedArray::Make(30, run_end_values, ArrayFromJSON(int32(), "[2, 0]")));
}

TEST_P(TestRunEndEncodedArray, FindOffsetAndLength) {
TEST_P(TestRunEndEncodedArray, FindPhysicalRange) {
auto run_ends = ArrayFromJSON(run_end_type, "[100, 200, 300, 400, 500]");
auto values = ArrayFromJSON(utf8(), R"(["Hello", "beautiful", "world", "of", "REE"])");
ASSERT_OK_AND_ASSIGN(auto ree_array, RunEndEncodedArray::Make(500, run_ends, values));

auto Range = [](int64_t offset, int64_t length) -> std::pair<int64_t, int64_t> {
return std::make_pair(offset, length);
};
ASSERT_EQ(ree_array->FindPhysicalOffset(), 0);
ASSERT_EQ(ree_array->FindPhysicalLength(), 5);
ASSERT_EQ(ree_util::FindPhysicalRange(*ree_array->data(), ree_array->offset(),
ree_array->length()),
Range(0, 5));

auto slice = std::dynamic_pointer_cast<RunEndEncodedArray>(ree_array->Slice(199, 5));
ASSERT_EQ(slice->FindPhysicalOffset(), 1);
ASSERT_EQ(slice->FindPhysicalLength(), 2);
ASSERT_EQ(ree_util::FindPhysicalRange(*slice->data(), slice->offset(), slice->length()),
Range(1, 2));

auto slice2 = std::dynamic_pointer_cast<RunEndEncodedArray>(ree_array->Slice(199, 101));
ASSERT_EQ(slice2->FindPhysicalOffset(), 1);
ASSERT_EQ(slice2->FindPhysicalLength(), 2);
ASSERT_EQ(
ree_util::FindPhysicalRange(*slice2->data(), slice2->offset(), slice2->length()),
Range(1, 2));

auto slice3 = std::dynamic_pointer_cast<RunEndEncodedArray>(ree_array->Slice(400, 100));
ASSERT_EQ(slice3->FindPhysicalOffset(), 4);
ASSERT_EQ(slice3->FindPhysicalLength(), 1);
ASSERT_EQ(
ree_util::FindPhysicalRange(*slice3->data(), slice3->offset(), slice3->length()),
Range(4, 1));

auto slice4 = std::dynamic_pointer_cast<RunEndEncodedArray>(ree_array->Slice(0, 150));
ASSERT_EQ(slice4->FindPhysicalOffset(), 0);
ASSERT_EQ(slice4->FindPhysicalLength(), 2);
ASSERT_EQ(
ree_util::FindPhysicalRange(*slice4->data(), slice4->offset(), slice4->length()),
Range(0, 2));

auto zero_length_at_end =
std::dynamic_pointer_cast<RunEndEncodedArray>(ree_array->Slice(500, 0));
ASSERT_EQ(zero_length_at_end->FindPhysicalOffset(), 5);
ASSERT_EQ(zero_length_at_end->FindPhysicalLength(), 0);
ASSERT_EQ(ree_util::FindPhysicalRange(*zero_length_at_end->data(),
zero_length_at_end->offset(),
zero_length_at_end->length()),
Range(5, 0));
}

TEST_P(TestRunEndEncodedArray, LogicalRunEnds) {
Expand Down
15 changes: 15 additions & 0 deletions cpp/src/arrow/util/byte_size.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/util/logging.h"
#include "arrow/util/ree_util.h"
#include "arrow/visit_type_inline.h"

namespace arrow {
Expand Down Expand Up @@ -294,6 +295,20 @@ struct GetByteRangesArray {
return Status::OK();
}

Status Visit(const RunEndEncodedType& type) const {
auto [phys_offset, phys_length] = ree_util::FindPhysicalRange(input, offset, length);
for (int i = 0; i < type.num_fields(); i++) {
GetByteRangesArray child{*input.child_data[i],
/*offset=*/input.child_data[i]->offset + phys_offset,
/*length=*/phys_length,
range_starts,
range_offsets,
range_lengths};
RETURN_NOT_OK(VisitTypeInline(*type.field(i)->type(), &child));
}
return Status::OK();
}

Status Visit(const ExtensionType& extension_type) const {
GetByteRangesArray storage{input, offset, length,
range_starts, range_offsets, range_lengths};
Expand Down
27 changes: 27 additions & 0 deletions cpp/src/arrow/util/byte_size_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,33 @@ TEST(ByteRanges, SparseUnion) {
});
}

TEST(ByteRanges, RunEndEncodedArray) {
auto run_ends =
ArrayFromJSON(int32(), "[-1, -1, 100, 200, 300, 400, 500, -1]")->Slice(2, 5);
auto values = ArrayFromJSON(int32(), R"([-1, 1, 2, 3, 4, 5, -1, -1])")->Slice(1, 5);
ASSERT_OK_AND_ASSIGN(auto ree_array, RunEndEncodedArray::Make(500, run_ends, values));
CheckBufferRanges(ree_array, {
{0, 8, 20},
{1, 4, 20},
});
CheckBufferRanges(ree_array->Slice(50), {
{0, 8, 20},
{1, 4, 20},
});
CheckBufferRanges(ree_array->Slice(100, 400), {
{0, 12, 16},
{1, 8, 16},
});
CheckBufferRanges(ree_array->Slice(100, 301), {
{0, 12, 16},
{1, 8, 16},
});
CheckBufferRanges(ree_array->Slice(100, 300), {
{0, 12, 12},
{1, 8, 12},
});
}

TEST(ByteRanges, ExtensionArray) {
std::shared_ptr<Array> ext_arr = ExampleUuid();
CheckBufferRanges(ext_arr, {{0, 0, 1}, {1, 0, 64}});
Expand Down
20 changes: 20 additions & 0 deletions cpp/src/arrow/util/ree_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,26 @@ int64_t FindPhysicalLength(const ArraySpan& span) {
return internal::FindPhysicalLength<int64_t>(span);
}

std::pair<int64_t, int64_t> FindPhysicalRange(const ArraySpan& span, int64_t offset,
int64_t length) {
const auto& run_ends_span = RunEndsArray(span);
auto type_id = run_ends_span.type->id();
if (type_id == Type::INT16) {
auto* run_ends = run_ends_span.GetValues<int16_t>(1);
return internal::FindPhysicalRange<int16_t>(run_ends, run_ends_span.length, length,
offset);
}
if (type_id == Type::INT32) {
auto* run_ends = run_ends_span.GetValues<int32_t>(1);
return internal::FindPhysicalRange<int32_t>(run_ends, run_ends_span.length, length,
offset);
}
DCHECK_EQ(type_id, Type::INT64);
auto* run_ends = run_ends_span.GetValues<int64_t>(1);
return internal::FindPhysicalRange<int64_t>(run_ends, run_ends_span.length, length,
offset);
}

namespace {

template <typename RunEndCType>
Expand Down
41 changes: 32 additions & 9 deletions cpp/src/arrow/util/ree_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,38 @@ int64_t FindPhysicalIndex(const RunEndCType* run_ends, int64_t run_ends_size, in
return result;
}

/// \brief Uses binary-search to calculate the number of physical values (and
/// \brief Uses binary-search to calculate the range of physical values (and
/// run-ends) necessary to represent the logical range of values from
/// offset to length
///
/// \return a pair of physical offset and physical length
template <typename RunEndCType>
int64_t FindPhysicalLength(const RunEndCType* run_ends, int64_t run_ends_size,
int64_t length, int64_t offset) {
std::pair<int64_t, int64_t> FindPhysicalRange(const RunEndCType* run_ends,
int64_t run_ends_size, int64_t length,
int64_t offset) {
const int64_t physical_offset =
FindPhysicalIndex<RunEndCType>(run_ends, run_ends_size, 0, offset);
// The physical length is calculated by finding the offset of the last element
// and adding 1 to it, so first we ensure there is at least one element.
if (length == 0) {
return 0;
return {physical_offset, 0};
}
const int64_t physical_offset =
FindPhysicalIndex<RunEndCType>(run_ends, run_ends_size, 0, offset);
const int64_t physical_index_of_last = FindPhysicalIndex<RunEndCType>(
run_ends + physical_offset, run_ends_size - physical_offset, length - 1, offset);

assert(physical_index_of_last < run_ends_size - physical_offset);
return physical_index_of_last + 1;
return {physical_offset, physical_index_of_last + 1};
}

/// \brief Uses binary-search to calculate the number of physical values (and
/// run-ends) necessary to represent the logical range of values from
/// offset to length
template <typename RunEndCType>
int64_t FindPhysicalLength(const RunEndCType* run_ends, int64_t run_ends_size,
int64_t length, int64_t offset) {
auto [_, physical_length] =
FindPhysicalRange<RunEndCType>(run_ends, run_ends_size, length, offset);
return physical_length;
}

/// \brief Find the physical index into the values array of the REE ArraySpan
Expand Down Expand Up @@ -125,7 +139,8 @@ int64_t FindPhysicalLength(const ArraySpan& span) {
/// \brief Find the physical index into the values array of the REE ArraySpan
///
/// This function uses binary-search, so it has a O(log N) cost.
int64_t FindPhysicalIndex(const ArraySpan& span, int64_t i, int64_t absolute_offset);
ARROW_EXPORT int64_t FindPhysicalIndex(const ArraySpan& span, int64_t i,
int64_t absolute_offset);

/// \brief Find the physical length of an REE ArraySpan
///
Expand All @@ -136,7 +151,15 @@ int64_t FindPhysicalIndex(const ArraySpan& span, int64_t i, int64_t absolute_off
/// Avoid calling this function if the physical length can be estabilished in
/// some other way (e.g. when iterating over the runs sequentially until the
/// end). This function uses binary-search, so it has a O(log N) cost.
int64_t FindPhysicalLength(const ArraySpan& span);
ARROW_EXPORT int64_t FindPhysicalLength(const ArraySpan& span);

/// \brief Find the physical range of physical values referenced by the REE in
/// the logical range from offset to offset + length
///
/// \return a pair of physical offset and physical length
ARROW_EXPORT std::pair<int64_t, int64_t> FindPhysicalRange(const ArraySpan& span,
int64_t offset,
int64_t length);

template <typename RunEndCType>
class RunEndEncodedArraySpan {
Expand Down

0 comments on commit e4bf235

Please sign in to comment.