Skip to content

Commit

Permalink
Adapt to apache-arrow 15 (v6d-io#1729)
Browse files Browse the repository at this point in the history
Fixes v6d-io#1728

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
sighingnow authored and dashanji committed Feb 2, 2024
1 parent 95b5d41 commit 17a9fbe
Show file tree
Hide file tree
Showing 2 changed files with 286 additions and 0 deletions.
34 changes: 34 additions & 0 deletions modules/basic/ds/arrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ class ArrowArrayBuilderVisitor {
return Status::OK();
}

#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
Status Visit(const arrow::StringViewType* type) {
return Status::NotImplemented(
"Type not implemented: " + std::to_string(type->id()) + ", " +
type->ToString());
}
#endif

Status Visit(const arrow::LargeStringType*) {
builder_ = std::make_shared<LargeStringArrayBuilder>(client_, array_);
return Status::OK();
Expand All @@ -149,6 +157,14 @@ class ArrowArrayBuilderVisitor {
return Status::OK();
}

#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
Status Visit(const arrow::BinaryViewType* type) {
return Status::NotImplemented(
"Type not implemented: " + std::to_string(type->id()) + ", " +
type->ToString());
}
#endif

Status Visit(const arrow::LargeBinaryType*) {
builder_ = std::make_shared<LargeBinaryArrayBuilder>(client_, array_);
return Status::OK();
Expand All @@ -163,10 +179,28 @@ class ArrowArrayBuilderVisitor {
builder_ = std::make_shared<ListArrayBuilder>(client_, array_);
return Status::OK();
}

#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
Status Visit(const arrow::ListViewType* type) {
return Status::NotImplemented(
"Type not implemented: " + std::to_string(type->id()) + ", " +
type->ToString());
}
#endif

Status Visit(const arrow::LargeListType*) {
builder_ = std::make_shared<LargeListArrayBuilder>(client_, array_);
return Status::OK();
}

#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
Status Visit(const arrow::LargeListViewType* type) {
return Status::NotImplemented(
"Type not implemented: " + std::to_string(type->id()) + ", " +
type->ToString());
}
#endif

Status Visit(const arrow::FixedSizeListType*) {
builder_ = std::make_shared<FixedSizeListArrayBuilder>(client_, array_);
return Status::OK();
Expand Down
252 changes: 252 additions & 0 deletions modules/basic/ds/arrow_shim/concatenate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/type_fwd.h"
#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
#include "arrow/util/bit_block_counter.h"
#endif
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
Expand All @@ -59,10 +62,16 @@
#if defined(ARROW_VERSION) && ARROW_VERSION >= 9000000
#include "arrow/util/int_util_overflow.h"
#endif
#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
#include "arrow/util/list_util.h"
#endif
#include "arrow/util/logging.h"
#if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
#include "arrow/util/ree_util.h"
#endif
#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
#include "arrow/visit_data_inline.h"
#endif
#if defined(ARROW_VERSION) && ARROW_VERSION >= 7000000
#include "arrow/visit_type_inline.h"
#else
Expand Down Expand Up @@ -225,6 +234,163 @@ Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset,
return Status::OK();
}

#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
// for "arrow/util/slice_util_internal.h"
static inline Status CheckSliceParams(int64_t object_length,
int64_t slice_offset,
int64_t slice_length,
const char* object_name) {
if (ARROW_PREDICT_FALSE(slice_offset < 0)) {
return Status::IndexError("Negative ", object_name, " slice offset");
}
if (ARROW_PREDICT_FALSE(slice_length < 0)) {
return Status::IndexError("Negative ", object_name, " slice length");
}
int64_t offset_plus_length;
if (ARROW_PREDICT_FALSE(internal::AddWithOverflow(slice_offset, slice_length,
&offset_plus_length))) {
return Status::IndexError(object_name, " slice would overflow");
}
if (ARROW_PREDICT_FALSE(offset_plus_length > object_length)) {
return Status::IndexError(object_name, " slice would exceed ", object_name,
" length");
}
return Status::OK();
}

int64_t SumBufferSizesInBytes(const BufferVector& buffers) {
int64_t size = 0;
for (const auto& buffer : buffers) {
size += buffer->size();
}
return size;
}

template <typename offset_type>
Status PutListViewOffsets(const ArrayData& input, offset_type* sizes,
const Buffer& src, offset_type displacement,
offset_type* dst);

// Concatenate buffers holding list-view offsets into a single buffer of offsets
//
// value_ranges contains the relevant ranges of values in the child array
// actually referenced to by the views. Most commonly, these ranges will start
// from 0, but when that is not the case, we need to adjust the displacement of
// offsets. The concatenated child array does not contain values from the
// beginning if they are not referenced to by any view.
//
// The child arrays and the sizes buffer are used to ensure we can trust the
// offsets in offset_buffers to be within the valid range.
//
// This function also mutates sizes so that null list-view entries have size 0.
//
// \param[in] in The child arrays
// \param[in,out] sizes The concatenated sizes buffer
template <typename offset_type>
Status ConcatenateListViewOffsets(const ArrayDataVector& in, offset_type* sizes,
const BufferVector& offset_buffers,
const std::vector<Range>& value_ranges,
MemoryPool* pool,
std::shared_ptr<Buffer>* out) {
DCHECK_EQ(offset_buffers.size(), value_ranges.size());

// Allocate resulting offsets buffer and initialize it with zeros
const int64_t out_size_in_bytes = SumBufferSizesInBytes(offset_buffers);
ARROW_ASSIGN_OR_RAISE(*out, AllocateBuffer(out_size_in_bytes, pool));
memset((*out)->mutable_data(), 0, static_cast<size_t>((*out)->size()));

auto* out_offsets = (*out)->mutable_data_as<offset_type>();

int64_t num_child_values = 0;
int64_t elements_length = 0;
for (size_t i = 0; i < offset_buffers.size(); ++i) {
const auto displacement =
static_cast<offset_type>(num_child_values - value_ranges[i].offset);
RETURN_NOT_OK(PutListViewOffsets(*in[i], /*sizes=*/sizes + elements_length,
/*src=*/*offset_buffers[i], displacement,
/*dst=*/out_offsets + elements_length));
elements_length += offset_buffers[i]->size() / sizeof(offset_type);
num_child_values += value_ranges[i].length;
if (num_child_values > std::numeric_limits<offset_type>::max()) {
return Status::Invalid("offset overflow while concatenating arrays");
}
}
DCHECK_EQ(elements_length,
static_cast<int64_t>(out_size_in_bytes / sizeof(offset_type)));

return Status::OK();
}

template <typename offset_type>
Status PutListViewOffsets(const ArrayData& input, offset_type* sizes,
const Buffer& src, offset_type displacement,
offset_type* dst) {
if (src.size() == 0) {
return Status::OK();
}
const auto& validity_buffer = input.buffers[0];
if (validity_buffer) {
// Ensure that it is safe to access all the bits in the validity bitmap of
// input.
RETURN_NOT_OK(CheckSliceParams(/*size=*/8 * validity_buffer->size(),
input.offset, input.length, "buffer"));
}

const auto offsets = src.data_as<offset_type>();
DCHECK_EQ(static_cast<int64_t>(src.size() / sizeof(offset_type)),
input.length);

auto visit_not_null = [&](int64_t position) {
if (sizes[position] > 0) {
// NOTE: Concatenate can be called during IPC reads to append delta
// dictionaries. Avoid UB on non-validated input by doing the addition in
// the unsigned domain. (the result can later be validated using
// Array::ValidateFull)
const auto displaced_offset =
SafeSignedAdd(offsets[position], displacement);
// displaced_offset>=0 is guaranteed by RangeOfValuesUsed returning the
// smallest offset of valid and non-empty list-views.
DCHECK_GE(displaced_offset, 0);
dst[position] = displaced_offset;
} else {
// Do nothing to leave the dst[position] as 0.
}
};

const auto* validity =
validity_buffer ? validity_buffer->data_as<uint8_t>() : nullptr;
internal::OptionalBitBlockCounter bit_counter(validity, input.offset,
input.length);
int64_t position = 0;
while (position < input.length) {
internal::BitBlockCount block = bit_counter.NextBlock();
if (block.AllSet()) {
for (int64_t i = 0; i < block.length; ++i, ++position) {
visit_not_null(position);
}
} else if (block.NoneSet()) {
// NOTE: we don't have to do anything for the null entries regarding the
// offsets as the buffer is initialized to 0 when it is allocated.

// Zero-out the sizes of the null entries to ensure these sizes are not
// greater than the new values length of the concatenated array.
memset(sizes + position, 0, block.length * sizeof(offset_type));
position += block.length;
} else {
for (int64_t i = 0; i < block.length; ++i, ++position) {
if (bit_util::GetBit(validity, input.offset + position)) {
visit_not_null(position);
} else {
// Zero-out the size at position.
sizes[position] = 0;
}
}
}
}
return Status::OK();
}
#endif

class ConcatenateImpl {
public:
ConcatenateImpl(ArrayDataVector&& in, MemoryPool* pool)
Expand Down Expand Up @@ -280,6 +446,52 @@ class ConcatenateImpl {
.Value(&out_->buffers[2]);
}

#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
Status Visit(const BinaryViewType&) {
out_->buffers.resize(2);

for (const auto& in_data : in_) {
for (const auto& buf : util::span(in_data->buffers).subspan(2)) {
out_->buffers.push_back(buf);
}
}

ARROW_ASSIGN_OR_RAISE(auto view_buffers, Buffers(1, BinaryViewType::kSize));
ARROW_ASSIGN_OR_RAISE(auto view_buffer,
ConcatenateBuffers(view_buffers, pool_));

auto* views = view_buffer->mutable_data_as<BinaryViewType::c_type>();
size_t preceding_buffer_count = 0;

int64_t i = in_[0]->length;
for (size_t in_index = 1; in_index < in_.size(); ++in_index) {
preceding_buffer_count += in_[in_index - 1]->buffers.size() - 2;

for (int64_t end_i = i + in_[in_index]->length; i < end_i; ++i) {
if (views[i].is_inline())
continue;
views[i].ref.buffer_index =
SafeSignedAdd(views[i].ref.buffer_index,
static_cast<int32_t>(preceding_buffer_count));
}
}

if (out_->buffers[0] != nullptr) {
i = in_[0]->length;
VisitNullBitmapInline(
out_->buffers[0]->data(), i, out_->length - i, out_->null_count,
[&] { ++i; },
[&] {
views[i++] =
{}; // overwrite views under null bits with an empty view
});
}

out_->buffers[1] = std::move(view_buffer);
return Status::OK();
}
#endif

Status Visit(const LargeBinaryType&) {
std::vector<Range> value_ranges;
ARROW_ASSIGN_OR_RAISE(auto index_buffers, Buffers(1, sizeof(int64_t)));
Expand Down Expand Up @@ -317,6 +529,46 @@ class ConcatenateImpl {
.Concatenate(&out_->child_data[0]);
}

#if defined(ARROW_VERSION) && ARROW_VERSION >= 15000000
template <typename T>
enable_if_list_view<T, Status> Visit(const T& type) {
using offset_type = typename T::offset_type;
out_->buffers.resize(3);
out_->child_data.resize(1);

// Calculate the ranges of values that each list-view array uses
std::vector<Range> value_ranges;
value_ranges.reserve(in_.size());
for (const auto& input : in_) {
ArraySpan input_span(*input);
Range range;
ARROW_ASSIGN_OR_RAISE(std::tie(range.offset, range.length),
list_util::internal::RangeOfValuesUsed(input_span));
value_ranges.push_back(range);
}

// Concatenate the values
ARROW_ASSIGN_OR_RAISE(ArrayDataVector value_data,
ChildData(0, value_ranges));
RETURN_NOT_OK(ConcatenateImpl(std::move(value_data), pool_)
.Concatenate(&out_->child_data[0]));
out_->child_data[0]->type = type.value_type();

// Concatenate the sizes first
ARROW_ASSIGN_OR_RAISE(auto size_buffers, Buffers(2, sizeof(offset_type)));
RETURN_NOT_OK(
ConcatenateBuffers(size_buffers, pool_).Value(&out_->buffers[2]));

// Concatenate the offsets
ARROW_ASSIGN_OR_RAISE(auto offset_buffers, Buffers(1, sizeof(offset_type)));
RETURN_NOT_OK(ConcatenateListViewOffsets<offset_type>(
in_, /*sizes=*/out_->buffers[2]->mutable_data_as<offset_type>(),
offset_buffers, value_ranges, pool_, &out_->buffers[1]));

return Status::OK();
}
#endif

Status Visit(const StructType& s) {
for (int i = 0; i < s.num_fields(); ++i) {
ARROW_ASSIGN_OR_RAISE(auto child_data, ChildData(i));
Expand Down

0 comments on commit 17a9fbe

Please sign in to comment.