Skip to content

Commit

Permalink
ARROW-8142: [C++][Compute] Explicit no chunks case for WrapDatumsLike
Browse files Browse the repository at this point in the history
When kernels which use the `WrapDatumsLike` utility on a chunked array, they invoke the ChunkedArray constructor without an explicit type. This [is forbidden for an empty chunks vector](https://github.com/bkietz/arrow/blob/1cb7a6dcbfc61a734e0e888917fa3938051de82d/cpp/src/arrow/table.h#L41) and causes an assertion failure. Since we always have access to the desired type (through Kernel::out_type), we can avoid this by passing that type to WrapDatumsLike.

Closes #6668 from bkietz/8142-Casting-a-chunked-array-w

Lead-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
2 people authored and pitrou committed Mar 25, 2020
1 parent 04d7da8 commit 960824e
Show file tree
Hide file tree
Showing 16 changed files with 164 additions and 58 deletions.
40 changes: 37 additions & 3 deletions cpp/src/arrow/compute/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.

add_custom_target(arrow_compute)

arrow_install_all_headers("arrow/compute")

# pkg-config support
Expand All @@ -24,9 +26,41 @@ arrow_add_pkg_config("arrow-compute")
# Unit tests
#

add_arrow_test(compute_test)
add_arrow_test(expression_test PREFIX "arrow-compute")
add_arrow_test(operations/operations_test PREFIX "arrow-compute")
function(ADD_ARROW_COMPUTE_TEST REL_TEST_NAME)
set(options)
set(one_value_args PREFIX)
set(multi_value_args LABELS)
cmake_parse_arguments(ARG
"${options}"
"${one_value_args}"
"${multi_value_args}"
${ARGN})

if(ARG_PREFIX)
set(PREFIX ${ARG_PREFIX})
else()
set(PREFIX "arrow-compute")
endif()

if(ARG_LABELS)
set(LABELS ${ARG_LABELS})
else()
set(LABELS "arrow_compute")
endif()

add_arrow_test(${REL_TEST_NAME}
EXTRA_LINK_LIBS
${ARROW_DATASET_TEST_LINK_LIBS}
PREFIX
${PREFIX}
LABELS
${LABELS}
${ARG_UNPARSED_ARGUMENTS})
endfunction()

add_arrow_compute_test(compute_test)
add_arrow_compute_test(expression_test)
add_arrow_compute_test(operations/operations_test)
add_arrow_benchmark(compute_benchmark)

add_subdirectory(kernels)
33 changes: 20 additions & 13 deletions cpp/src/arrow/compute/kernels/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,35 @@

arrow_install_all_headers("arrow/compute/kernels")

add_arrow_test(boolean_test PREFIX "arrow-compute")
add_arrow_test(cast_test PREFIX "arrow-compute")
add_arrow_test(hash_test PREFIX "arrow-compute")
add_arrow_test(isin_test PREFIX "arrow-compute")
add_arrow_test(match_test PREFIX "arrow-compute")
add_arrow_test(sort_to_indices_test PREFIX "arrow-compute")
add_arrow_test(nth_to_indices_test PREFIX "arrow-compute")
add_arrow_test(util_internal_test PREFIX "arrow-compute")
add_arrow_test(add-test PREFIX "arrow-compute")
add_arrow_compute_test(boolean_test)
add_arrow_compute_test(cast_test)
add_arrow_compute_test(hash_test)
add_arrow_compute_test(isin_test)
add_arrow_compute_test(match_test)
add_arrow_compute_test(sort_to_indices_test)
add_arrow_compute_test(nth_to_indices_test)
add_arrow_compute_test(util_internal_test)
add_arrow_compute_test(add_test)

# Aggregates
add_arrow_compute_test(aggregate_test)

# Comparison
add_arrow_compute_test(compare_test)

# Selection
add_arrow_compute_test(take_test)
add_arrow_compute_test(filter_test)

add_arrow_benchmark(sort_to_indices_benchmark PREFIX "arrow-compute")
add_arrow_benchmark(nth_to_indices_benchmark PREFIX "arrow-compute")

# Aggregates
add_arrow_test(aggregate_test PREFIX "arrow-compute")
add_arrow_benchmark(aggregate_benchmark PREFIX "arrow-compute")

# Comparison
add_arrow_test(compare_test PREFIX "arrow-compute")
add_arrow_benchmark(compare_benchmark PREFIX "arrow-compute")

# Selection
add_arrow_test(take_test PREFIX "arrow-compute")
add_arrow_test(filter_test PREFIX "arrow-compute")
add_arrow_benchmark(filter_benchmark PREFIX "arrow-compute")
add_arrow_benchmark(take_benchmark PREFIX "arrow-compute")
File renamed without changes.
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/kernels/boolean.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Status Invert(FunctionContext* ctx, const Datum& value, Datum* out) {
std::vector<Datum> result;
RETURN_NOT_OK(detail::InvokeUnaryArrayKernel(ctx, &kernel, value, &result));

*out = detail::WrapDatumsLike(value, result);
*out = detail::WrapDatumsLike(value, invert.out_type(), result);
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/kernels/cast.cc
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ Status InvokeWithAllocation(FunctionContext* ctx, UnaryKernel* func, const Datum
RETURN_NOT_OK(detail::InvokeUnaryArrayKernel(ctx, func, input, &result));
}
ARROW_RETURN_IF_ERROR(ctx);
*out = detail::WrapDatumsLike(input, result);
*out = detail::WrapDatumsLike(input, func->out_type(), result);
return Status::OK();
}

Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/compute/kernels/cast_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,15 @@ TEST_F(TestCast, SameTypeZeroCopy) {
AssertBufferSame(*arr, *result, 1);
}

TEST_F(TestCast, ZeroChunks) {
auto chunked_i32 = std::make_shared<ChunkedArray>(ArrayVector{}, int32());
Datum result;
ASSERT_OK(Cast(&this->ctx_, chunked_i32, utf8(), {}, &result));

ASSERT_EQ(result.kind(), Datum::CHUNKED_ARRAY);
AssertChunkedEqual(*result.chunked_array(), ChunkedArray({}, utf8()));
}

TEST_F(TestCast, FromBoolean) {
CastOptions options;

Expand Down
30 changes: 9 additions & 21 deletions cpp/src/arrow/compute/kernels/hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ class NullHashKernelImpl : public HashKernelImpl {
return Status::OK();
}

std::shared_ptr<DataType> out_type() const override { return null(); }
std::shared_ptr<DataType> out_type() const override { return action_.out_type(); }

protected:
MemoryPool* pool_;
Expand Down Expand Up @@ -568,31 +568,19 @@ Status DictionaryEncode(FunctionContext* ctx, const Datum& value, Datum* out) {
std::unique_ptr<HashKernel> func;
RETURN_NOT_OK(GetDictionaryEncodeKernel(ctx, value.type(), &func));

std::shared_ptr<Array> dictionary;
std::shared_ptr<Array> dict;
std::vector<Datum> indices_outputs;
RETURN_NOT_OK(InvokeHash(ctx, func.get(), value, &indices_outputs, &dictionary));
RETURN_NOT_OK(InvokeHash(ctx, func.get(), value, &indices_outputs, &dict));

auto dict_type = dictionary(func->out_type(), dict->type());

// Wrap indices in dictionary arrays for result
std::vector<std::shared_ptr<Array>> dict_chunks;
std::shared_ptr<DataType> dict_type;

if (indices_outputs.size() == 0) {
// Special case: empty was an empty chunked array
DCHECK_EQ(value.kind(), Datum::CHUNKED_ARRAY);
dict_type = ::arrow::dictionary(int32(), dictionary->type());
*out = std::make_shared<ChunkedArray>(dict_chunks, dict_type);
} else {
// Create the dictionary type
DCHECK_EQ(indices_outputs[0].kind(), Datum::ARRAY);
dict_type = ::arrow::dictionary(indices_outputs[0].array()->type, dictionary->type());

// Create DictionaryArray for each piece yielded by the kernel invocations
for (const Datum& datum : indices_outputs) {
dict_chunks.emplace_back(std::make_shared<DictionaryArray>(
dict_type, MakeArray(datum.array()), dictionary));
}
*out = detail::WrapArraysLike(value, dict_chunks);
for (const Datum& datum : indices_outputs) {
dict_chunks.emplace_back(
std::make_shared<DictionaryArray>(dict_type, datum.make_array(), dict));
}
*out = detail::WrapArraysLike(value, dict_type, dict_chunks);

return Status::OK();
}
Expand Down
49 changes: 49 additions & 0 deletions cpp/src/arrow/compute/kernels/hash_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,18 @@ TYPED_TEST(TestHashKernelPrimitive, DictEncode) {
ArrayFromJSON(type, "[1, 4, 3]"), ArrayFromJSON(int32(), "[0, null, 1, 2, 0]"));
}

TYPED_TEST(TestHashKernelPrimitive, ZeroChunks) {
auto type = TypeTraits<TypeParam>::type_singleton();

Datum result;
auto zero_chunks = std::make_shared<ChunkedArray>(ArrayVector{}, type);
ASSERT_OK(DictionaryEncode(&this->ctx_, zero_chunks, &result));

ASSERT_EQ(result.kind(), Datum::CHUNKED_ARRAY);
AssertChunkedEqual(*result.chunked_array(),
ChunkedArray({}, dictionary(int32(), type)));
}

TYPED_TEST(TestHashKernelPrimitive, PrimitiveResizeTable) {
using T = typename TypeParam::c_type;

Expand Down Expand Up @@ -362,6 +374,43 @@ class TestHashKernelBinaryTypes : public TestHashKernel {

TYPED_TEST_SUITE(TestHashKernelBinaryTypes, StringTypes);

TYPED_TEST(TestHashKernelBinaryTypes, ZeroChunks) {
auto type = this->type();

Datum result;
auto zero_chunks = std::make_shared<ChunkedArray>(ArrayVector{}, type);
ASSERT_OK(DictionaryEncode(&this->ctx_, zero_chunks, &result));

ASSERT_EQ(result.kind(), Datum::CHUNKED_ARRAY);
AssertChunkedEqual(*result.chunked_array(),
ChunkedArray({}, dictionary(int32(), type)));
}

TYPED_TEST(TestHashKernelBinaryTypes, TwoChunks) {
auto type = this->type();

Datum result;
auto two_chunks = std::make_shared<ChunkedArray>(
ArrayVector{
ArrayFromJSON(type, "[\"a\"]"),
ArrayFromJSON(type, "[\"b\"]"),
},
type);
ASSERT_OK(DictionaryEncode(&this->ctx_, two_chunks, &result));

auto dict_type = dictionary(int32(), type);
auto dictionary = ArrayFromJSON(type, R"(["a", "b"])");

auto chunk_0 = std::make_shared<DictionaryArray>(
dict_type, ArrayFromJSON(int32(), "[0]"), dictionary);
auto chunk_1 = std::make_shared<DictionaryArray>(
dict_type, ArrayFromJSON(int32(), "[1]"), dictionary);

ASSERT_EQ(result.kind(), Datum::CHUNKED_ARRAY);
AssertChunkedEqual(*result.chunked_array(),
ChunkedArray({chunk_0, chunk_1}, dict_type));
}

TYPED_TEST(TestHashKernelBinaryTypes, Unique) {
this->CheckUniqueP({"test", "", "test2", "test"}, {true, false, true, true},
{"test", "", "test2"}, {1, 0, 1});
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/kernels/isin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ Status IsIn(FunctionContext* ctx, const Datum& left, const Datum& right, Datum*
detail::PrimitiveAllocatingUnaryKernel kernel(lkernel.get());
RETURN_NOT_OK(detail::InvokeUnaryArrayKernel(ctx, &kernel, left, &outputs));

*out = detail::WrapDatumsLike(left, outputs);
*out = detail::WrapDatumsLike(left, lkernel->out_type(), outputs);
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/kernels/match.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ Status Match(FunctionContext* ctx, const Datum& haystack, const Datum& needles,
RETURN_NOT_OK(GetMatchKernel(ctx, haystack.type(), needles, &kernel));
RETURN_NOT_OK(detail::InvokeUnaryArrayKernel(ctx, kernel.get(), haystack, &outputs));

*out = detail::WrapDatumsLike(haystack, outputs);
*out = detail::WrapDatumsLike(haystack, kernel->out_type(), outputs);
return Status::OK();
}

Expand Down
13 changes: 7 additions & 6 deletions cpp/src/arrow/compute/kernels/util_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,24 +170,25 @@ Status InvokeBinaryArrayKernel(FunctionContext* ctx, BinaryKernel* kernel,
const Datum& left, const Datum& right, Datum* output) {
std::vector<Datum> result;
RETURN_NOT_OK(InvokeBinaryArrayKernel(ctx, kernel, left, right, &result));
*output = detail::WrapDatumsLike(left, result);
*output = detail::WrapDatumsLike(left, kernel->out_type(), result);
return Status::OK();
}

Datum WrapArraysLike(const Datum& value,
Datum WrapArraysLike(const Datum& value, std::shared_ptr<DataType> type,
const std::vector<std::shared_ptr<Array>>& arrays) {
// Create right kind of datum
if (value.kind() == Datum::ARRAY) {
return Datum(arrays[0]->data());
} else if (value.kind() == Datum::CHUNKED_ARRAY) {
return Datum(std::make_shared<ChunkedArray>(arrays));
return Datum(std::make_shared<ChunkedArray>(arrays, std::move(type)));
} else {
ARROW_LOG(FATAL) << "unhandled datum kind";
return Datum();
}
}

Datum WrapDatumsLike(const Datum& value, const std::vector<Datum>& datums) {
Datum WrapDatumsLike(const Datum& value, std::shared_ptr<DataType> type,
const std::vector<Datum>& datums) {
// Create right kind of datum
if (value.kind() == Datum::ARRAY) {
DCHECK_EQ(1, datums.size());
Expand All @@ -196,9 +197,9 @@ Datum WrapDatumsLike(const Datum& value, const std::vector<Datum>& datums) {
std::vector<std::shared_ptr<Array>> arrays;
for (const Datum& datum : datums) {
DCHECK_EQ(Datum::ARRAY, datum.kind());
arrays.emplace_back(MakeArray(datum.array()));
arrays.push_back(datum.make_array());
}
return Datum(std::make_shared<ChunkedArray>(arrays));
return Datum(std::make_shared<ChunkedArray>(std::move(arrays), std::move(type)));
} else {
ARROW_LOG(FATAL) << "unhandled datum kind";
return Datum();
Expand Down
10 changes: 4 additions & 6 deletions cpp/src/arrow/compute/kernels/util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#ifndef ARROW_COMPUTE_KERNELS_UTIL_INTERNAL_H
#define ARROW_COMPUTE_KERNELS_UTIL_INTERNAL_H
#pragma once

#include <memory>
#include <vector>
Expand Down Expand Up @@ -104,11 +103,12 @@ Status AssignNullIntersection(FunctionContext* ctx, const ArrayData& left,
const ArrayData& right, ArrayData* output);

ARROW_EXPORT
Datum WrapArraysLike(const Datum& value,
Datum WrapArraysLike(const Datum& value, std::shared_ptr<DataType> type,
const std::vector<std::shared_ptr<Array>>& arrays);

ARROW_EXPORT
Datum WrapDatumsLike(const Datum& value, const std::vector<Datum>& datums);
Datum WrapDatumsLike(const Datum& value, std::shared_ptr<DataType> type,
const std::vector<Datum>& datums);

/// \brief Kernel used to preallocate outputs for primitive types. This
/// does not include allocations for the validity bitmap (PropagateNulls
Expand Down Expand Up @@ -152,5 +152,3 @@ class ARROW_EXPORT PrimitiveAllocatingBinaryKernel : public BinaryKernel {

} // namespace compute
} // namespace arrow

#endif // ARROW_COMPUTE_KERNELS_UTIL_INTERNAL_H
6 changes: 3 additions & 3 deletions cpp/src/arrow/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ bool ChunkedArray::Equals(const ChunkedArray& other) const {
if (null_count_ != other.null_count()) {
return false;
}
if (length_ == 0) {
// We cannot toggle check_metadata here yet, so we don't check it
return type_->Equals(*other.type_, /*check_metadata=*/false);
// We cannot toggle check_metadata here yet, so we don't check it
if (!type_->Equals(*other.type_, /*check_metadata=*/false)) {
return false;
}

// Check contents of the underlying arrays. This checks for equality of
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "arrow/record_batch.h"
Expand All @@ -42,8 +43,8 @@ class ARROW_EXPORT ChunkedArray {
explicit ChunkedArray(ArrayVector chunks);

/// \brief Construct a chunked array from a single Array
explicit ChunkedArray(const std::shared_ptr<Array>& chunk)
: ChunkedArray(ArrayVector({chunk})) {}
explicit ChunkedArray(std::shared_ptr<Array> chunk)
: ChunkedArray(ArrayVector{std::move(chunk)}) {}

/// \brief Construct a chunked array from a vector of arrays and a data type
///
Expand Down
12 changes: 12 additions & 0 deletions python/pyarrow/tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,17 @@ def test_cast_chunked_array():
assert casted.equals(expected)


def test_cast_chunked_array_empty():
# ARROW-8142
for typ1, typ2 in [(pa.dictionary(pa.int8(), pa.string()), pa.string()),
(pa.int64(), pa.int32())]:

arr = pa.chunked_array([], type=typ1)
result = arr.cast(typ2)
expected = pa.chunked_array([], type=typ2)
assert result.equals(expected)


def test_chunked_array_data_warns():
with pytest.warns(FutureWarning):
res = pa.chunked_array([[]]).data
Expand Down Expand Up @@ -1139,6 +1150,7 @@ def test_dictionary_encode_sliced():
assert result.equals(expected)
result = pa.chunked_array([arr]).dictionary_encode()
assert result.num_chunks == 1
assert result.type == expected.type
assert result.chunk(0).equals(expected)
result = pa.chunked_array([], type=arr.type).dictionary_encode()
assert result.num_chunks == 0
Expand Down
Loading

0 comments on commit 960824e

Please sign in to comment.