Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43495: [C++][Compute] Widen the row offset of the row table to 64-bit #43389

Merged
merged 34 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
937ff8b
Widen the offset of row table to 64b
zanmato1984 Jul 23, 2024
ab93a63
More fix
zanmato1984 Jul 24, 2024
e50e266
More fix
zanmato1984 Jul 24, 2024
c7e5225
Fix test name
zanmato1984 Jul 24, 2024
02669e1
Remove header
zanmato1984 Jul 24, 2024
148118a
Support avx2
zanmato1984 Jul 24, 2024
65aa81e
Fix warning
zanmato1984 Jul 24, 2024
2c4d57f
Fix unaligned store
zanmato1984 Jul 25, 2024
db12592
Fix warning
zanmato1984 Jul 25, 2024
5bd2981
Fix warning
zanmato1984 Jul 25, 2024
5f40f97
Add test for compare rows bigger than 4GB
zanmato1984 Jul 25, 2024
9c7c3b7
Fix warning on windows
zanmato1984 Jul 25, 2024
136b588
Refine compare tests
zanmato1984 Jul 26, 2024
9f9db1b
WIP join test
zanmato1984 Jul 26, 2024
26cfd77
Fix
zanmato1984 Jul 26, 2024
c4005d4
Fix refined test
zanmato1984 Jul 26, 2024
ef7c0c7
Add a todo to be fixed for large hash join test
zanmato1984 Jul 26, 2024
f171e54
Hash join test for fixed length type
zanmato1984 Jul 27, 2024
25e86ff
Pass all hash join tests
zanmato1984 Jul 28, 2024
c4ab99a
Refine tests
zanmato1984 Jul 28, 2024
2b0ad20
Fix
zanmato1984 Jul 28, 2024
0f4b770
Refine
zanmato1984 Jul 28, 2024
984b5e6
Fix compile
zanmato1984 Jul 28, 2024
49a4067
Fix warning
zanmato1984 Jul 29, 2024
2ef0b46
Fix warning
zanmato1984 Jul 29, 2024
e65657a
Remove useless fix
zanmato1984 Jul 29, 2024
780e2be
Update gh issue number in tests
zanmato1984 Aug 8, 2024
348ce66
More accurate offset assertion for large row table tests
zanmato1984 Aug 8, 2024
ad0f78e
Add comment about the row length invariant
zanmato1984 Aug 9, 2024
1215355
Make random FixedSizeBinary more backward compatible
zanmato1984 Aug 9, 2024
70b1936
Add comments for non-trivial simd code
zanmato1984 Aug 9, 2024
f23f706
Add comments about swiss join avx2 not wired
zanmato1984 Aug 14, 2024
0082bca
Update doc string for compare columns to rows api
zanmato1984 Aug 16, 2024
ded3e67
Static assert the size of offset type in row table at where assumes it
zanmato1984 Aug 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 192 additions & 0 deletions cpp/src/arrow/acero/hash_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "arrow/compute/kernels/test_util.h"
#include "arrow/compute/light_array_internal.h"
#include "arrow/testing/extension_type.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
#include "arrow/testing/random.h"
Expand All @@ -40,6 +41,10 @@ using testing::UnorderedElementsAreArray;

namespace arrow {

using arrow::gen::Constant;
using arrow::random::kSeedMax;
using arrow::random::RandomArrayGenerator;
using compute::and_;
using compute::call;
using compute::default_exec_context;
using compute::ExecBatchBuilder;
Expand Down Expand Up @@ -3253,5 +3258,192 @@ TEST(HashJoin, ManyJoins) {
ASSERT_OK_AND_ASSIGN(std::ignore, DeclarationToTable(std::move(root)));
}

namespace {

void AssertRowCountEq(Declaration source, int64_t expected) {
Declaration count{"aggregate",
{std::move(source)},
AggregateNodeOptions{/*aggregates=*/{{"count_all", "count(*)"}}}};
ASSERT_OK_AND_ASSIGN(auto batches, DeclarationToExecBatches(std::move(count)));
ASSERT_EQ(batches.batches.size(), 1);
ASSERT_EQ(batches.batches[0].values.size(), 1);
ASSERT_TRUE(batches.batches[0].values[0].is_scalar());
ASSERT_EQ(batches.batches[0].values[0].scalar()->type->id(), Type::INT64);
ASSERT_TRUE(batches.batches[0].values[0].scalar_as<Int64Scalar>().is_valid);
ASSERT_EQ(batches.batches[0].values[0].scalar_as<Int64Scalar>().value, expected);
}

} // namespace

// GH-43495: Test that both the key and the payload of the right side (the build side) are
// fixed length and larger than 4GB, and the 64-bit offset in the hash table can handle it
// correctly.
TEST(HashJoin, LARGE_MEMORY_TEST(BuildSideOver4GBFixedLength)) {
constexpr int64_t k5GB = 5ll * 1024 * 1024 * 1024;
constexpr int fixed_length = 128;
const auto type = fixed_size_binary(fixed_length);
constexpr uint8_t byte_no_match_min = static_cast<uint8_t>('A');
constexpr uint8_t byte_no_match_max = static_cast<uint8_t>('y');
constexpr uint8_t byte_match = static_cast<uint8_t>('z');
const auto value_match =
std::make_shared<FixedSizeBinaryScalar>(std::string(fixed_length, byte_match));
constexpr int16_t num_rows_per_batch_left = 128;
constexpr int16_t num_rows_per_batch_right = 4096;
const int64_t num_batches_left = 8;
const int64_t num_batches_right =
k5GB / (num_rows_per_batch_right * type->byte_width());

// Left side composed of num_batches_left identical batches of num_rows_per_batch_left
// rows of value_match-es.
BatchesWithSchema batches_left;
{
// A column with num_rows_per_batch_left value_match-es.
ASSERT_OK_AND_ASSIGN(auto column,
Constant(value_match)->Generate(num_rows_per_batch_left));

// Use the column as both the key and the payload.
ExecBatch batch({column, column}, num_rows_per_batch_left);
batches_left =
BatchesWithSchema{std::vector<ExecBatch>(num_batches_left, std::move(batch)),
schema({field("l_key", type), field("l_payload", type)})};
}

// Right side composed of num_batches_right identical batches of
// num_rows_per_batch_right rows containing only 1 value_match.
BatchesWithSchema batches_right;
{
// A column with (num_rows_per_batch_right - 1) non-value_match-es (possibly null) and
// 1 value_match.
auto non_matches = RandomArrayGenerator(kSeedMax).FixedSizeBinary(
num_rows_per_batch_right - 1, fixed_length,
/*null_probability =*/0.01, /*min_byte=*/byte_no_match_min,
/*max_byte=*/byte_no_match_max);
ASSERT_OK_AND_ASSIGN(auto match, Constant(value_match)->Generate(1));
ASSERT_OK_AND_ASSIGN(auto column, Concatenate({non_matches, match}));

// Use the column as both the key and the payload.
ExecBatch batch({column, column}, num_rows_per_batch_right);
batches_right =
BatchesWithSchema{std::vector<ExecBatch>(num_batches_right, std::move(batch)),
schema({field("r_key", type), field("r_payload", type)})};
}

Declaration left{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(batches_left.schema),
std::move(batches_left.batches))};

Declaration right{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(batches_right.schema),
std::move(batches_right.batches))};

HashJoinNodeOptions join_opts(JoinType::INNER, /*left_keys=*/{"l_key"},
/*right_keys=*/{"r_key"});
Declaration join{"hashjoin", {std::move(left), std::move(right)}, join_opts};

ASSERT_OK_AND_ASSIGN(auto batches_result, DeclarationToExecBatches(std::move(join)));
Declaration result{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(batches_result.schema),
std::move(batches_result.batches))};

// The row count of hash join should be (number of value_match-es in left side) *
// (number of value_match-es in right side).
AssertRowCountEq(result,
num_batches_left * num_rows_per_batch_left * num_batches_right);

// All rows should be value_match-es.
auto predicate = and_({equal(field_ref("l_key"), literal(value_match)),
equal(field_ref("l_payload"), literal(value_match)),
equal(field_ref("r_key"), literal(value_match)),
equal(field_ref("r_payload"), literal(value_match))});
Declaration filter{"filter", {result}, FilterNodeOptions{std::move(predicate)}};
AssertRowCountEq(std::move(filter),
num_batches_left * num_rows_per_batch_left * num_batches_right);
}

// GH-43495: Test that both the key and the payload of the right side (the build side) are
// var length and larger than 4GB, and the 64-bit offset in the hash table can handle it
// correctly.
TEST(HashJoin, LARGE_MEMORY_TEST(BuildSideOver4GBVarLength)) {
constexpr int64_t k5GB = 5ll * 1024 * 1024 * 1024;
const auto type = utf8();
constexpr int value_no_match_length_min = 128;
constexpr int value_no_match_length_max = 129;
constexpr int value_match_length = 130;
const auto value_match =
std::make_shared<StringScalar>(std::string(value_match_length, 'X'));
constexpr int16_t num_rows_per_batch_left = 128;
constexpr int16_t num_rows_per_batch_right = 4096;
const int64_t num_batches_left = 8;
const int64_t num_batches_right =
k5GB / (num_rows_per_batch_right * value_no_match_length_min);

// Left side composed of num_batches_left identical batches of num_rows_per_batch_left
// rows of value_match-es.
BatchesWithSchema batches_left;
{
// A column with num_rows_per_batch_left value_match-es.
ASSERT_OK_AND_ASSIGN(auto column,
Constant(value_match)->Generate(num_rows_per_batch_left));

// Use the column as both the key and the payload.
ExecBatch batch({column, column}, num_rows_per_batch_left);
batches_left =
BatchesWithSchema{std::vector<ExecBatch>(num_batches_left, std::move(batch)),
schema({field("l_key", type), field("l_payload", type)})};
}

// Right side composed of num_batches_right identical batches of
// num_rows_per_batch_right rows containing only 1 value_match.
BatchesWithSchema batches_right;
{
// A column with (num_rows_per_batch_right - 1) non-value_match-es (possibly null) and
// 1 value_match.
auto non_matches =
RandomArrayGenerator(kSeedMax).String(num_rows_per_batch_right - 1,
/*min_length=*/value_no_match_length_min,
/*max_length=*/value_no_match_length_max,
/*null_probability =*/0.01);
ASSERT_OK_AND_ASSIGN(auto match, Constant(value_match)->Generate(1));
ASSERT_OK_AND_ASSIGN(auto column, Concatenate({non_matches, match}));

// Use the column as both the key and the payload.
ExecBatch batch({column, column}, num_rows_per_batch_right);
batches_right =
BatchesWithSchema{std::vector<ExecBatch>(num_batches_right, std::move(batch)),
schema({field("r_key", type), field("r_payload", type)})};
}

Declaration left{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(batches_left.schema),
std::move(batches_left.batches))};

Declaration right{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(batches_right.schema),
std::move(batches_right.batches))};

HashJoinNodeOptions join_opts(JoinType::INNER, /*left_keys=*/{"l_key"},
/*right_keys=*/{"r_key"});
Declaration join{"hashjoin", {std::move(left), std::move(right)}, join_opts};

ASSERT_OK_AND_ASSIGN(auto batches_result, DeclarationToExecBatches(std::move(join)));
Declaration result{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(batches_result.schema),
std::move(batches_result.batches))};

// The row count of hash join should be (number of value_match-es in left side) *
// (number of value_match-es in right side).
AssertRowCountEq(result,
num_batches_left * num_rows_per_batch_left * num_batches_right);

// All rows should be value_match-es.
auto predicate = and_({equal(field_ref("l_key"), literal(value_match)),
equal(field_ref("l_payload"), literal(value_match)),
equal(field_ref("r_key"), literal(value_match)),
equal(field_ref("r_payload"), literal(value_match))});
Declaration filter{"filter", {result}, FilterNodeOptions{std::move(predicate)}};
AssertRowCountEq(std::move(filter),
num_batches_left * num_rows_per_batch_left * num_batches_right);
}

} // namespace acero
} // namespace arrow
26 changes: 11 additions & 15 deletions cpp/src/arrow/acero/swiss_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void RowArrayAccessor::Visit(const RowTableImpl& rows, int column_id, int num_ro
if (!is_fixed_length_column) {
int varbinary_column_id = VarbinaryColumnId(rows.metadata(), column_id);
const uint8_t* row_ptr_base = rows.data(2);
const uint32_t* row_offsets = rows.offsets();
const RowTableImpl::offset_type* row_offsets = rows.offsets();
uint32_t field_offset_within_row, field_length;

if (varbinary_column_id == 0) {
Expand Down Expand Up @@ -173,7 +173,7 @@ void RowArrayAccessor::Visit(const RowTableImpl& rows, int column_id, int num_ro
// Case 4: This is a fixed length column in a varying length row
//
const uint8_t* row_ptr_base = rows.data(2) + field_offset_within_row;
const uint32_t* row_offsets = rows.offsets();
const RowTableImpl::offset_type* row_offsets = rows.offsets();
for (int i = 0; i < num_rows; ++i) {
uint32_t row_id = row_ids[i];
const uint8_t* row_ptr = row_ptr_base + row_offsets[row_id];
Expand Down Expand Up @@ -473,17 +473,10 @@ Status RowArrayMerge::PrepareForMerge(RowArray* target,
(*first_target_row_id)[sources.size()] = num_rows;
}

if (num_bytes > std::numeric_limits<uint32_t>::max()) {
return Status::Invalid(
"There are more than 2^32 bytes of key data. Acero cannot "
"process a join of this magnitude");
}

// Allocate target memory
//
target->rows_.Clean();
RETURN_NOT_OK(target->rows_.AppendEmpty(static_cast<uint32_t>(num_rows),
static_cast<uint32_t>(num_bytes)));
RETURN_NOT_OK(target->rows_.AppendEmpty(static_cast<uint32_t>(num_rows), num_bytes));
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved

// In case of varying length rows,
// initialize the first row offset for each range of rows corresponding to a
Expand Down Expand Up @@ -565,15 +558,15 @@ void RowArrayMerge::CopyVaryingLength(RowTableImpl* target, const RowTableImpl&
int64_t first_target_row_offset,
const int64_t* source_rows_permutation) {
int64_t num_source_rows = source.length();
uint32_t* target_offsets = target->mutable_offsets();
const uint32_t* source_offsets = source.offsets();
RowTableImpl::offset_type* target_offsets = target->mutable_offsets();
const RowTableImpl::offset_type* source_offsets = source.offsets();

// Permutation of source rows is optional.
//
if (!source_rows_permutation) {
int64_t target_row_offset = first_target_row_offset;
for (int64_t i = 0; i < num_source_rows; ++i) {
target_offsets[first_target_row_id + i] = static_cast<uint32_t>(target_row_offset);
target_offsets[first_target_row_id + i] = target_row_offset;
target_row_offset += source_offsets[i + 1] - source_offsets[i];
}
// We purposefully skip outputting of N+1 offset, to allow concurrent
Expand All @@ -593,7 +586,10 @@ void RowArrayMerge::CopyVaryingLength(RowTableImpl* target, const RowTableImpl&
int64_t source_row_id = source_rows_permutation[i];
const uint64_t* source_row_ptr = reinterpret_cast<const uint64_t*>(
source.data(2) + source_offsets[source_row_id]);
uint32_t length = source_offsets[source_row_id + 1] - source_offsets[source_row_id];
int64_t length = source_offsets[source_row_id + 1] - source_offsets[source_row_id];
// Though the row offset is 64-bit, the length of a single row must be 32-bit as
// required by current row table implementation.
DCHECK_LE(length, std::numeric_limits<uint32_t>::max());
pitrou marked this conversation as resolved.
Show resolved Hide resolved

// Rows should be 64-bit aligned.
// In that case we can copy them using a sequence of 64-bit read/writes.
Expand All @@ -604,7 +600,7 @@ void RowArrayMerge::CopyVaryingLength(RowTableImpl* target, const RowTableImpl&
*target_row_ptr++ = *source_row_ptr++;
}

target_offsets[first_target_row_id + i] = static_cast<uint32_t>(target_row_offset);
target_offsets[first_target_row_id + i] = target_row_offset;
target_row_offset += length;
}
}
Expand Down
Loading
Loading