Skip to content

Commit

Permalink
ARROW-7802: [C++][Python] Support LargeBinary and LargeString in the …
Browse files Browse the repository at this point in the history
…hash kernel

templatized BinaryMemoTable so that it can hold either a BinaryBuilder or a LargeBinaryBuilder.

Also added Python binding for ValueCounts().

Closes #6548 from brills/hash

Lead-authored-by: Zhuo Peng <1835738+brills@users.noreply.github.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
brills and pitrou committed Mar 12, 2020
1 parent 1270034 commit f9ce855
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 70 deletions.
7 changes: 4 additions & 3 deletions cpp/src/arrow/array/dict_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,16 @@ struct DictionaryTraits<T, enable_if_base_binary<T>> {
const MemoTableType& memo_table,
int64_t start_offset,
std::shared_ptr<ArrayData>* out) {
using offset_type = typename T::offset_type;
std::shared_ptr<Buffer> dict_offsets;
std::shared_ptr<Buffer> dict_data;

// Create the offsets buffer
auto dict_length = static_cast<int64_t>(memo_table.size() - start_offset);
if (dict_length > 0) {
RETURN_NOT_OK(AllocateBuffer(
pool, TypeTraits<Int32Type>::bytes_required(dict_length + 1), &dict_offsets));
auto raw_offsets = reinterpret_cast<int32_t*>(dict_offsets->mutable_data());
RETURN_NOT_OK(
AllocateBuffer(pool, sizeof(offset_type) * (dict_length + 1), &dict_offsets));
auto raw_offsets = reinterpret_cast<offset_type*>(dict_offsets->mutable_data());
memo_table.CopyOffsets(static_cast<int32_t>(start_offset), raw_offsets);
}

Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/compute/kernels/hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,9 @@ struct HashKernelTraits<Type, Action, with_error_status, with_memo_visit_null,
PROCESS(Time64Type) \
PROCESS(TimestampType) \
PROCESS(BinaryType) \
PROCESS(LargeBinaryType) \
PROCESS(StringType) \
PROCESS(LargeStringType) \
PROCESS(FixedSizeBinaryType) \
PROCESS(Decimal128Type)

Expand Down
98 changes: 56 additions & 42 deletions cpp/src/arrow/compute/kernels/hash_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
namespace arrow {
namespace compute {

using StringTypes =
::testing::Types<StringType, LargeStringType, BinaryType, LargeBinaryType>;

// ----------------------------------------------------------------------
// Dictionary tests

Expand Down Expand Up @@ -325,57 +328,77 @@ TEST_F(TestHashKernel, DictEncodeBoolean) {
ArrayFromJSON(boolean(), "[true]"), ArrayFromJSON(int32(), "[0, null, 0]"));
}

TEST_F(TestHashKernel, UniqueBinary) {
CheckUnique<BinaryType, std::string>(
&this->ctx_, binary(), {"test", "", "test2", "test"}, {true, false, true, true},
{"test", "", "test2"}, {1, 0, 1});
template <typename ArrowType>
class TestHashKernelBinaryTypes : public TestHashKernel {
protected:
std::shared_ptr<DataType> type() { return TypeTraits<ArrowType>::type_singleton(); }

void CheckDictEncodeP(const std::vector<std::string>& in_values,
const std::vector<bool>& in_is_valid,
const std::vector<std::string>& out_values,
const std::vector<bool>& out_is_valid,
const std::vector<int32_t>& out_indices) {
CheckDictEncode<ArrowType, std::string>(&this->ctx_, type(), in_values, in_is_valid,
out_values, out_is_valid, out_indices);
}

void CheckValueCountsP(const std::vector<std::string>& in_values,
const std::vector<bool>& in_is_valid,
const std::vector<std::string>& out_values,
const std::vector<bool>& out_is_valid,
const std::vector<int64_t>& out_counts) {
CheckValueCounts<ArrowType, std::string>(&this->ctx_, type(), in_values, in_is_valid,
out_values, out_is_valid, out_counts);
}

void CheckUniqueP(const std::vector<std::string>& in_values,
const std::vector<bool>& in_is_valid,
const std::vector<std::string>& out_values,
const std::vector<bool>& out_is_valid) {
CheckUnique<ArrowType, std::string>(&this->ctx_, type(), in_values, in_is_valid,
out_values, out_is_valid);
}
};

CheckUnique<StringType, std::string>(&this->ctx_, utf8(), {"test", "", "test2", "test"},
{true, false, true, true}, {"test", "", "test2"},
{1, 0, 1});
TYPED_TEST_CASE(TestHashKernelBinaryTypes, StringTypes);

TYPED_TEST(TestHashKernelBinaryTypes, Unique) {
this->CheckUniqueP({"test", "", "test2", "test"}, {true, false, true, true},
{"test", "", "test2"}, {1, 0, 1});

// Sliced
CheckUnique(
&this->ctx_,
ArrayFromJSON(binary(), R"(["ab", null, "cd", "ef", "cd", "gh"])")->Slice(1, 4),
ArrayFromJSON(binary(), R"([null, "cd", "ef"])"));
ArrayFromJSON(this->type(), R"(["ab", null, "cd", "ef", "cd", "gh"])")->Slice(1, 4),
ArrayFromJSON(this->type(), R"([null, "cd", "ef"])"));
}

TEST_F(TestHashKernel, ValueCountsBinary) {
CheckValueCounts<BinaryType, std::string>(
&this->ctx_, binary(), {"test", "", "test2", "test"}, {true, false, true, true},
{"test", "", "test2"}, {1, 0, 1}, {2, 1, 1});

CheckValueCounts<StringType, std::string>(
&this->ctx_, utf8(), {"test", "", "test2", "test"}, {true, false, true, true},
{"test", "", "test2"}, {1, 0, 1}, {2, 1, 1});
TYPED_TEST(TestHashKernelBinaryTypes, ValueCounts) {
this->CheckValueCountsP({"test", "", "test2", "test"}, {true, false, true, true},
{"test", "", "test2"}, {1, 0, 1}, {2, 1, 1});

// Sliced
CheckValueCounts(
&this->ctx_,
ArrayFromJSON(binary(), R"(["ab", null, "cd", "ab", "cd", "ef"])")->Slice(1, 4),
ArrayFromJSON(binary(), R"([null, "cd", "ab"])"),
ArrayFromJSON(this->type(), R"(["ab", null, "cd", "ab", "cd", "ef"])")->Slice(1, 4),
ArrayFromJSON(this->type(), R"([null, "cd", "ab"])"),
ArrayFromJSON(int64(), "[1, 2, 1]"));
}

TEST_F(TestHashKernel, DictEncodeBinary) {
CheckDictEncode<BinaryType, std::string>(
&this->ctx_, binary(), {"test", "", "test2", "test", "baz"},
{true, false, true, true, true}, {"test", "test2", "baz"}, {}, {0, 0, 1, 0, 2});

CheckDictEncode<StringType, std::string>(
&this->ctx_, utf8(), {"test", "", "test2", "test", "baz"},
{true, false, true, true, true}, {"test", "test2", "baz"}, {}, {0, 0, 1, 0, 2});
TYPED_TEST(TestHashKernelBinaryTypes, DictEncode) {
this->CheckDictEncodeP({"test", "", "test2", "test", "baz"},
{true, false, true, true, true}, {"test", "test2", "baz"}, {},
{0, 0, 1, 0, 2});

// Sliced
CheckDictEncode(
&this->ctx_,
ArrayFromJSON(binary(), R"(["ab", null, "cd", "ab", "cd", "ef"])")->Slice(1, 4),
ArrayFromJSON(binary(), R"(["cd", "ab"])"),
ArrayFromJSON(this->type(), R"(["ab", null, "cd", "ab", "cd", "ef"])")->Slice(1, 4),
ArrayFromJSON(this->type(), R"(["cd", "ab"])"),
ArrayFromJSON(int32(), "[null, 0, 1, 0]"));
}

TEST_F(TestHashKernel, BinaryResizeTable) {
TYPED_TEST(TestHashKernelBinaryTypes, BinaryResizeTable) {
const int32_t kTotalValues = 10000;
#if !defined(ARROW_VALGRIND)
const int32_t kRepeats = 10;
Expand Down Expand Up @@ -403,18 +426,9 @@ TEST_F(TestHashKernel, BinaryResizeTable) {
indices.push_back(index);
}

CheckUnique<BinaryType, std::string>(&this->ctx_, binary(), values, {}, uniques, {});
CheckValueCounts<BinaryType, std::string>(&this->ctx_, binary(), values, {}, uniques,
{}, counts);

CheckDictEncode<BinaryType, std::string>(&this->ctx_, binary(), values, {}, uniques, {},
indices);

CheckUnique<StringType, std::string>(&this->ctx_, utf8(), values, {}, uniques, {});
CheckValueCounts<StringType, std::string>(&this->ctx_, utf8(), values, {}, uniques, {},
counts);
CheckDictEncode<StringType, std::string>(&this->ctx_, utf8(), values, {}, uniques, {},
indices);
this->CheckUniqueP(values, {}, uniques, {});
this->CheckValueCountsP(values, {}, uniques, {}, counts);
this->CheckDictEncodeP(values, {}, uniques, {}, indices);
}

TEST_F(TestHashKernel, UniqueFixedSizeBinary) {
Expand Down
46 changes: 28 additions & 18 deletions cpp/src/arrow/util/hashing.h
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,10 @@ class SmallScalarMemoTable : public MemoTable {
// ----------------------------------------------------------------------
// A memoization table for variable-sized binary data.

template <typename BinaryBuilderT>
class BinaryMemoTable : public MemoTable {
public:
using builder_offset_type = typename BinaryBuilderT::offset_type;
explicit BinaryMemoTable(MemoryPool* pool, int64_t entries = 0,
int64_t values_size = -1)
: hash_table_(pool, static_cast<uint64_t>(entries)), binary_builder_(pool) {
Expand All @@ -599,7 +601,7 @@ class BinaryMemoTable : public MemoTable {
DCHECK_OK(binary_builder_.ReserveData(data_size));
}

int32_t Get(const void* data, int32_t length) const {
int32_t Get(const void* data, builder_offset_type length) const {
hash_t h = ComputeStringHash<0>(data, length);
auto p = Lookup(h, data, length);
if (p.second) {
Expand All @@ -610,11 +612,11 @@ class BinaryMemoTable : public MemoTable {
}

int32_t Get(const util::string_view& value) const {
return Get(value.data(), static_cast<int32_t>(value.length()));
return Get(value.data(), static_cast<builder_offset_type>(value.length()));
}

template <typename Func1, typename Func2>
Status GetOrInsert(const void* data, int32_t length, Func1&& on_found,
Status GetOrInsert(const void* data, builder_offset_type length, Func1&& on_found,
Func2&& on_not_found, int32_t* out_memo_index) {
hash_t h = ComputeStringHash<0>(data, length);
auto p = Lookup(h, data, length);
Expand All @@ -639,25 +641,26 @@ class BinaryMemoTable : public MemoTable {
template <typename Func1, typename Func2>
Status GetOrInsert(const util::string_view& value, Func1&& on_found,
Func2&& on_not_found, int32_t* out_memo_index) {
return GetOrInsert(value.data(), static_cast<int32_t>(value.length()),
return GetOrInsert(value.data(), static_cast<builder_offset_type>(value.length()),
std::forward<Func1>(on_found), std::forward<Func2>(on_not_found),
out_memo_index);
}

Status GetOrInsert(const void* data, int32_t length, int32_t* out_memo_index) {
Status GetOrInsert(const void* data, builder_offset_type length,
int32_t* out_memo_index) {
return GetOrInsert(data, length, [](int32_t i) {}, [](int32_t i) {}, out_memo_index);
}

Status GetOrInsert(const util::string_view& value, int32_t* out_memo_index) {
return GetOrInsert(value.data(), static_cast<int32_t>(value.length()),
return GetOrInsert(value.data(), static_cast<builder_offset_type>(value.length()),
out_memo_index);
}

int32_t GetNull() const { return null_index_; }

template <typename Func1, typename Func2>
int32_t GetOrInsertNull(Func1&& on_found, Func2&& on_not_found) {
auto memo_index = GetNull();
int32_t memo_index = GetNull();
if (memo_index == kKeyNotFound) {
memo_index = null_index_ = size();
DCHECK_OK(binary_builder_.AppendNull());
Expand Down Expand Up @@ -685,12 +688,13 @@ class BinaryMemoTable : public MemoTable {
void CopyOffsets(int32_t start, Offset* out_data) const {
DCHECK_LE(start, size());

const int32_t* offsets = binary_builder_.offsets_data();
int32_t delta = offsets[start];
const builder_offset_type* offsets = binary_builder_.offsets_data();
const builder_offset_type delta = offsets[start];
for (int32_t i = start; i < size(); ++i) {
int32_t adjusted_offset = offsets[i] - delta;
const builder_offset_type adjusted_offset = offsets[i] - delta;
Offset cast_offset = static_cast<Offset>(adjusted_offset);
assert(static_cast<int32_t>(cast_offset) == adjusted_offset); // avoid truncation
assert(static_cast<builder_offset_type>(cast_offset) ==
adjusted_offset); // avoid truncation
*out_data++ = cast_offset;
}

Expand All @@ -713,8 +717,8 @@ class BinaryMemoTable : public MemoTable {
DCHECK_LE(start, size());

// The absolute byte offset of `start` value in the binary buffer.
int32_t offset = binary_builder_.offset(start);
auto length = binary_builder_.value_data_length() - static_cast<size_t>(offset);
const builder_offset_type offset = binary_builder_.offset(start);
const auto length = binary_builder_.value_data_length() - static_cast<size_t>(offset);

if (out_size != -1) {
assert(static_cast<int64_t>(length) <= out_size);
Expand Down Expand Up @@ -750,7 +754,7 @@ class BinaryMemoTable : public MemoTable {
return;
}

int32_t left_offset = binary_builder_.offset(start);
builder_offset_type left_offset = binary_builder_.offset(start);

// Ensure that the data length is exactly missing width_size bytes to fit
// in the expected output (n_values * width_size).
Expand Down Expand Up @@ -796,12 +800,12 @@ class BinaryMemoTable : public MemoTable {
using HashTableType = HashTable<Payload>;
using HashTableEntry = typename HashTable<Payload>::Entry;
HashTableType hash_table_;
BinaryBuilder binary_builder_;
BinaryBuilderT binary_builder_;

int32_t null_index_ = kKeyNotFound;

std::pair<const HashTableEntry*, bool> Lookup(hash_t h, const void* data,
int32_t length) const {
builder_offset_type length) const {
auto cmp_func = [=](const Payload* payload) {
util::string_view lhs = binary_builder_.GetView(payload->memo_index);
util::string_view rhs(static_cast<const char*>(data), length);
Expand Down Expand Up @@ -832,8 +836,14 @@ struct HashTraits<T, enable_if_t<has_c_type<T>::value && !is_8bit_int<T>::value>
};

template <typename T>
struct HashTraits<T, enable_if_has_string_view<T>> {
using MemoTableType = BinaryMemoTable;
struct HashTraits<T, enable_if_t<has_string_view<T>::value &&
!std::is_base_of<LargeBinaryType, T>::value>> {
using MemoTableType = BinaryMemoTable<BinaryBuilder>;
};

template <typename T>
struct HashTraits<T, enable_if_t<std::is_base_of<LargeBinaryType, T>::value>> {
using MemoTableType = BinaryMemoTable<LargeBinaryBuilder>;
};

template <typename MemoTableType>
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/util/hashing_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ TEST(BinaryMemoTable, Basics) {
F += '\0';
F += "trailing";

BinaryMemoTable table(default_memory_pool(), 0);
BinaryMemoTable<BinaryBuilder> table(default_memory_pool(), 0);
ASSERT_EQ(table.size(), 0);
AssertGet(table, A, kKeyNotFound);
AssertGetNull(table, kKeyNotFound);
Expand Down Expand Up @@ -458,7 +458,7 @@ TEST(BinaryMemoTable, Stress) {

const auto values = MakeDistinctStrings(n_values);

BinaryMemoTable table(default_memory_pool(), 0);
BinaryMemoTable<BinaryBuilder> table(default_memory_pool(), 0);
std::unordered_map<std::string, int32_t> map;

for (int32_t i = 0; i < n_repeats; ++i) {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,12 @@ struct DictEncoderTraits {

template <>
struct DictEncoderTraits<ByteArrayType> {
using MemoTableType = arrow::internal::BinaryMemoTable;
using MemoTableType = arrow::internal::BinaryMemoTable<arrow::BinaryBuilder>;
};

template <>
struct DictEncoderTraits<FLBAType> {
using MemoTableType = arrow::internal::BinaryMemoTable;
using MemoTableType = arrow::internal::BinaryMemoTable<arrow::BinaryBuilder>;
};

// Initially 1024 elements
Expand Down
15 changes: 15 additions & 0 deletions python/pyarrow/array.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,21 @@ cdef class Array(_PandasConvertible):

return wrap_datum(out)

def value_counts(self):
"""
Compute counts of unique elements in array.
Returns
-------
An array of <input type "Values", int64_t "Counts"> structs
"""
cdef shared_ptr[CArray] result

with nogil:
check_status(ValueCounts(_context(), CDatum(self.sp_array),
&result))
return pyarrow_wrap_array(result)

@staticmethod
def from_pandas(obj, mask=None, type=None, bint safe=True,
MemoryPool memory_pool=None):
Expand Down
3 changes: 3 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,9 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil:
CStatus DictionaryEncode(CFunctionContext* context, const CDatum& value,
CDatum* out)

CStatus ValueCounts(CFunctionContext* context, const CDatum& value,
shared_ptr[CArray]* out)

CStatus Sum(CFunctionContext* context, const CDatum& value, CDatum* out)

CStatus Take(CFunctionContext* context, const CDatum& values,
Expand Down
15 changes: 15 additions & 0 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,21 @@ cdef class ChunkedArray(_PandasConvertible):

return pyarrow_wrap_array(result)

def value_counts(self):
"""
Compute counts of unique elements in array.
Returns
-------
An array of <input type "Values", int64_t "Counts"> structs
"""
cdef shared_ptr[CArray] result

with nogil:
check_status(ValueCounts(_context(), CDatum(self.sp_chunked_array),
&result))
return pyarrow_wrap_array(result)

def slice(self, offset=0, length=None):
"""
Compute zero-copy slice of this ChunkedArray
Expand Down
Loading

0 comments on commit f9ce855

Please sign in to comment.