Skip to content

Commit

Permalink
ARROW-1558: [C++] Implement boolean filter (selection) kernel, rename…
Browse files Browse the repository at this point in the history
… comparison kernel-related functions

Materializes an array masked by a selection array (for example one produced by the filter kernel)

Author: Benjamin Kietzman <bengilgit@gmail.com>

Closes #4366 from bkietz/1558-Implement-boolean-selection-kernels and squashes the following commits:

032d341 <Benjamin Kietzman> fix doc error
3d92b6e <Benjamin Kietzman> Make FilterKernel public
e8465e5 <Benjamin Kietzman> iwyu: vector
030ac57 <Benjamin Kietzman> filter benchmarks += MinTime(1.0) nanoseconds
7702055 <Benjamin Kietzman> use expanded bitmap for FixedSizeList and List
060313c <Benjamin Kietzman> refactor FilterImpl<StructType> to own child kernels
24f2e85 <Benjamin Kietzman> add larger benchmarks to test for O(N^2) perf
e4d9d85 <Benjamin Kietzman> refactor FilterKernel::Make to use a switch
f833e02 <Benjamin Kietzman> add benchmark for fixed_size_list(int64(), 1)
f424f34 <Benjamin Kietzman> fix nits and typos
3387f21 <Benjamin Kietzman> use new path for concatenate.h
495e521 <Benjamin Kietzman> Add support for filtering MapArray
a8cb993 <Benjamin Kietzman> fix lint error
e3b4022 <Benjamin Kietzman> add filter impls for nested types
a216388 <Benjamin Kietzman> add explicit qualification for MSVC
ccd32a5 <Benjamin Kietzman> add a basic filter benchmark
8a9f379 <Benjamin Kietzman> add a test integrating with arrow::compute::Compare (array-array)
7c50027 <Benjamin Kietzman> add a test integrating with arrow::compute::Compare
6efc4f5 <Benjamin Kietzman> add filter tests with large, random arrays
0f29ab2 <Benjamin Kietzman> rename Mask -> Filter
edf2eb1 <Benjamin Kietzman> rename FilterFunction -> CompareFunction
4b24ca3 <Benjamin Kietzman> revert removal of TakeOptions
4c8ce6d <Benjamin Kietzman> revert submodule
a54741e <Benjamin Kietzman> add some tests with empty masks/take indices
d5c9c14 <Benjamin Kietzman> use checked_cast
223a860 <Benjamin Kietzman> fix typo
c953dca <Benjamin Kietzman> remove empty TakeOptions
db44424 <Benjamin Kietzman> remove empty MaskOptions
13a1969 <Benjamin Kietzman> initial mask kernel impl
  • Loading branch information
bkietz authored and wesm committed Jun 20, 2019
1 parent cd88d2e commit a6b210d
Show file tree
Hide file tree
Showing 18 changed files with 1,644 additions and 470 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ if(ARROW_COMPUTE)
compute/kernels/cast.cc
compute/kernels/compare.cc
compute/kernels/count.cc
compute/kernels/filter.cc
compute/kernels/hash.cc
compute/kernels/filter.cc
compute/kernels/mean.cc
compute/kernels/sum.cc
compute/kernels/take.cc
Expand Down
169 changes: 169 additions & 0 deletions cpp/src/arrow/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,175 @@ std::shared_ptr<Array> MakeArray(const std::shared_ptr<ArrayData>& data) {

namespace internal {

// get the maximum buffer length required, then allocate a single zeroed buffer
// to use anywhere a buffer is required
class NullArrayFactory {
public:
struct GetBufferLength {
GetBufferLength(const std::shared_ptr<DataType>& type, int64_t length)
: type_(*type), length_(length), buffer_length_(BitUtil::BytesForBits(length)) {}

operator int64_t() && {
DCHECK_OK(VisitTypeInline(type_, this));
return buffer_length_;
}

template <typename T, typename = decltype(TypeTraits<T>::bytes_required(0))>
Status Visit(const T&) {
return MaxOf(TypeTraits<T>::bytes_required(length_));
}

Status Visit(const ListType& type) {
// list's values array may be empty, but there must be at least one offset of 0
return MaxOf(sizeof(int32_t));
}

Status Visit(const FixedSizeListType& type) {
return MaxOf(GetBufferLength(type.value_type(), type.list_size() * length_));
}

Status Visit(const StructType& type) {
for (const auto& child : type.children()) {
DCHECK_OK(MaxOf(GetBufferLength(child->type(), length_)));
}
return Status::OK();
}

Status Visit(const UnionType& type) {
// type codes
DCHECK_OK(MaxOf(length_));
if (type.mode() == UnionMode::DENSE) {
// offsets
DCHECK_OK(MaxOf(sizeof(int32_t) * length_));
}
for (const auto& child : type.children()) {
DCHECK_OK(MaxOf(GetBufferLength(child->type(), length_)));
}
return Status::OK();
}

Status Visit(const DictionaryType& type) {
DCHECK_OK(MaxOf(GetBufferLength(type.value_type(), length_)));
return MaxOf(GetBufferLength(type.index_type(), length_));
}

Status Visit(const ExtensionType& type) {
// XXX is an extension array's length always == storage length
return MaxOf(GetBufferLength(type.storage_type(), length_));
}

Status Visit(const DataType& type) {
return Status::NotImplemented("construction of all-null ", type);
}

private:
Status MaxOf(int64_t buffer_length) {
if (buffer_length > buffer_length_) {
buffer_length_ = buffer_length;
}
return Status::OK();
}

const DataType& type_;
int64_t length_, buffer_length_;
};

NullArrayFactory(const std::shared_ptr<DataType>& type, int64_t length,
std::shared_ptr<ArrayData>* out)
: type_(type), length_(length), out_(out) {}

Status CreateBuffer() {
int64_t buffer_length = GetBufferLength(type_, length_);
RETURN_NOT_OK(AllocateBuffer(buffer_length, &buffer_));
std::memset(buffer_->mutable_data(), 0, buffer_->size());
return Status::OK();
}

Status Create() {
if (buffer_ == nullptr) {
RETURN_NOT_OK(CreateBuffer());
}
std::vector<std::shared_ptr<ArrayData>> child_data(type_->num_children());
*out_ = ArrayData::Make(type_, length_, {buffer_}, child_data, length_, 0);
return VisitTypeInline(*type_, this);
}

Status Visit(const NullType&) { return Status::OK(); }

Status Visit(const FixedWidthType&) {
(*out_)->buffers.resize(2, buffer_);
return Status::OK();
}

Status Visit(const BinaryType&) {
(*out_)->buffers.resize(3, buffer_);
return Status::OK();
}

Status Visit(const ListType& type) {
(*out_)->buffers.resize(2, buffer_);
return CreateChild(0, length_, &(*out_)->child_data[0]);
}

Status Visit(const FixedSizeListType& type) {
return CreateChild(0, length_ * type.list_size(), &(*out_)->child_data[0]);
}

Status Visit(const StructType& type) {
for (int i = 0; i < type_->num_children(); ++i) {
DCHECK_OK(CreateChild(i, length_, &(*out_)->child_data[i]));
}
return Status::OK();
}

Status Visit(const UnionType& type) {
if (type.mode() == UnionMode::DENSE) {
(*out_)->buffers.resize(3, buffer_);
} else {
(*out_)->buffers.resize(2, buffer_);
}

for (int i = 0; i < type_->num_children(); ++i) {
DCHECK_OK(CreateChild(i, length_, &(*out_)->child_data[i]));
}
return Status::OK();
}

Status Visit(const DictionaryType& type) {
(*out_)->buffers.resize(2, buffer_);
std::shared_ptr<ArrayData> dictionary_data;
return MakeArrayOfNull(type.value_type(), 0, &(*out_)->dictionary);
}

Status Visit(const DataType& type) {
return Status::NotImplemented("construction of all-null ", type);
}

Status CreateChild(int i, int64_t length, std::shared_ptr<ArrayData>* out) {
NullArrayFactory child_factory(type_->child(i)->type(), length,
&(*out_)->child_data[i]);
child_factory.buffer_ = buffer_;
return child_factory.Create();
}

std::shared_ptr<DataType> type_;
int64_t length_;
std::shared_ptr<ArrayData>* out_;
std::shared_ptr<Buffer> buffer_;
};

} // namespace internal

Status MakeArrayOfNull(const std::shared_ptr<DataType>& type, int64_t length,
std::shared_ptr<Array>* out) {
std::shared_ptr<ArrayData> out_data;
RETURN_NOT_OK(internal::NullArrayFactory(type, length, &out_data).Create());
*out = MakeArray(out_data);
return Status::OK();
}

namespace internal {

std::vector<ArrayVector> RechunkArraysConsistently(
const std::vector<ArrayVector>& groups) {
if (groups.size() <= 1) {
Expand Down
18 changes: 16 additions & 2 deletions cpp/src/arrow/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ struct ARROW_EXPORT ArrayData {
ARROW_EXPORT
std::shared_ptr<Array> MakeArray(const std::shared_ptr<ArrayData>& data);

/// \brief Create a strongly-typed Array instance with all elements null
/// \param[in] type the array type
/// \param[in] length the array length
/// \param[out] out resulting Array instance
ARROW_EXPORT
Status MakeArrayOfNull(const std::shared_ptr<DataType>& type, int64_t length,
std::shared_ptr<Array>* out);

// ----------------------------------------------------------------------
// User array accessor types

Expand Down Expand Up @@ -521,12 +529,15 @@ class ARROW_EXPORT ListArray : public Array {
/// Return pointer to raw value offsets accounting for any slice offset
const int32_t* raw_value_offsets() const { return raw_value_offsets_ + data_->offset; }

// Neither of these functions will perform boundschecking
// The following functions will not perform boundschecking
int32_t value_offset(int64_t i) const { return raw_value_offsets_[i + data_->offset]; }
int32_t value_length(int64_t i) const {
i += data_->offset;
return raw_value_offsets_[i + 1] - raw_value_offsets_[i];
}
std::shared_ptr<Array> value_slice(int64_t i) const {
return values_->Slice(value_offset(i), value_length(i));
}

protected:
// This constructor defers SetData to a derived array class
Expand Down Expand Up @@ -596,12 +607,15 @@ class ARROW_EXPORT FixedSizeListArray : public Array {

std::shared_ptr<DataType> value_type() const;

// Neither of these functions will perform boundschecking
// The following functions will not perform boundschecking
int32_t value_offset(int64_t i) const {
i += data_->offset;
return static_cast<int32_t>(list_size_ * i);
}
int32_t value_length(int64_t i = 0) const { return list_size_; }
std::shared_ptr<Array> value_slice(int64_t i) const {
return values_->Slice(value_offset(i), value_length(i));
}

protected:
void SetData(const std::shared_ptr<ArrayData>& data);
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/array/builder_primitive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,11 @@ Status BooleanBuilder::AppendValues(const std::vector<bool>& values) {
return Status::OK();
}

Status BooleanBuilder::AppendValues(int64_t length, bool value) {
RETURN_NOT_OK(Reserve(length));
data_builder_.UnsafeAppend(length, value);
ArrayBuilder::UnsafeSetNotNull(length);
return Status::OK();
}

} // namespace arrow
2 changes: 2 additions & 0 deletions cpp/src/arrow/array/builder_primitive.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
return Status::OK();
}

Status AppendValues(int64_t length, bool value);

Status FinishInternal(std::shared_ptr<ArrayData>* out) override;

/// \cond FALSE
Expand Down
23 changes: 23 additions & 0 deletions cpp/src/arrow/compute/benchmark-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,28 @@ void RegressionSetArgs(benchmark::internal::Benchmark* bench) {
BenchmarkSetArgsWithSizes(bench, {kL1Size});
}

// RAII struct to handle some of the boilerplate in regression benchmarks
struct RegressionArgs {
// size of memory tested (per iteration) in bytes
const int64_t size;

// proportion of nulls in generated arrays
const double null_proportion;

explicit RegressionArgs(benchmark::State& state)
: size(state.range(0)),
null_proportion(static_cast<double>(state.range(1)) / 100.0),
state_(state) {}

~RegressionArgs() {
state_.counters["size"] = static_cast<double>(size);
state_.counters["null_percent"] = static_cast<double>(state_.range(1));
state_.SetBytesProcessed(state_.iterations() * size);
}

private:
benchmark::State& state_;
};

} // namespace compute
} // namespace arrow
8 changes: 6 additions & 2 deletions cpp/src/arrow/compute/kernels/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ arrow_install_all_headers("arrow/compute/kernels")
add_arrow_test(boolean-test PREFIX "arrow-compute")
add_arrow_test(cast-test PREFIX "arrow-compute")
add_arrow_test(hash-test PREFIX "arrow-compute")
add_arrow_test(take-test PREFIX "arrow-compute")
add_arrow_test(util-internal-test PREFIX "arrow-compute")

# Aggregates
add_arrow_test(aggregate-test PREFIX "arrow-compute")
add_arrow_benchmark(aggregate-benchmark PREFIX "arrow-compute")

# Filters
# Comparison
add_arrow_test(compare-test PREFIX "arrow-compute")
add_arrow_benchmark(compare-benchmark PREFIX "arrow-compute")

# Selection
add_arrow_test(take-test PREFIX "arrow-compute")
add_arrow_test(filter-test PREFIX "arrow-compute")
add_arrow_benchmark(filter-benchmark PREFIX "arrow-compute")
84 changes: 84 additions & 0 deletions cpp/src/arrow/compute/kernels/compare-benchmark.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "benchmark/benchmark.h"

#include <vector>

#include "arrow/compute/benchmark-util.h"
#include "arrow/compute/kernel.h"
#include "arrow/compute/kernels/compare.h"
#include "arrow/compute/test-util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"

namespace arrow {
namespace compute {

constexpr auto kSeed = 0x94378165;

static void CompareArrayScalarKernel(benchmark::State& state) {
const int64_t memory_size = state.range(0);
const int64_t array_size = memory_size / sizeof(int64_t);
const double null_percent = static_cast<double>(state.range(1)) / 100.0;
auto rand = random::RandomArrayGenerator(kSeed);
auto array = std::static_pointer_cast<NumericArray<Int64Type>>(
rand.Int64(array_size, -100, 100, null_percent));

CompareOptions ge{GREATER_EQUAL};

FunctionContext ctx;
for (auto _ : state) {
Datum out;
ABORT_NOT_OK(Compare(&ctx, Datum(array), Datum(int64_t(0)), ge, &out));
benchmark::DoNotOptimize(out);
}

state.counters["size"] = static_cast<double>(memory_size);
state.counters["null_percent"] = static_cast<double>(state.range(1));
state.SetBytesProcessed(state.iterations() * array_size * sizeof(int64_t));
}

static void CompareArrayArrayKernel(benchmark::State& state) {
const int64_t memory_size = state.range(0);
const int64_t array_size = memory_size / sizeof(int64_t);
const double null_percent = static_cast<double>(state.range(1)) / 100.0;
auto rand = random::RandomArrayGenerator(kSeed);
auto lhs = std::static_pointer_cast<NumericArray<Int64Type>>(
rand.Int64(array_size, -100, 100, null_percent));
auto rhs = std::static_pointer_cast<NumericArray<Int64Type>>(
rand.Int64(array_size, -100, 100, null_percent));

CompareOptions ge(GREATER_EQUAL);

FunctionContext ctx;
for (auto _ : state) {
Datum out;
ABORT_NOT_OK(Compare(&ctx, Datum(lhs), Datum(rhs), ge, &out));
benchmark::DoNotOptimize(out);
}

state.counters["size"] = static_cast<double>(memory_size);
state.counters["null_percent"] = static_cast<double>(state.range(1));
state.SetBytesProcessed(state.iterations() * array_size * sizeof(int64_t) * 2);
}

BENCHMARK(CompareArrayScalarKernel)->Apply(RegressionSetArgs);
BENCHMARK(CompareArrayArrayKernel)->Apply(RegressionSetArgs);

} // namespace compute
} // namespace arrow
Loading

0 comments on commit a6b210d

Please sign in to comment.