Skip to content

Commit

Permalink
ARROW-15498: [C++][Compute] Implement Bloom filter pushdown between h…
Browse files Browse the repository at this point in the history
…ash joins

This adds Bloom filter pushdown between hash join nodes.

Closes #12289 from save-buffer/sasha_bloom_pushdown

Lead-authored-by: Sasha Krassovsky <krassovskysasha@gmail.com>
Co-authored-by: michalursa <michal@ursacomputing.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
  • Loading branch information
2 people authored and westonpace committed May 18, 2022
1 parent 6faee47 commit 0742f78
Show file tree
Hide file tree
Showing 22 changed files with 889 additions and 144 deletions.
1 change: 0 additions & 1 deletion cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,6 @@ if(ARROW_COMPUTE)
compute/exec/key_encode.cc
compute/exec/key_hash.cc
compute/exec/key_map.cc
compute/exec/options.cc
compute/exec/order_by_impl.cc
compute/exec/partition_util.cc
compute/exec/options.cc
Expand Down
37 changes: 23 additions & 14 deletions cpp/src/arrow/compute/exec/bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,21 +311,22 @@ Status BloomFilterBuilder_SingleThreaded::Begin(size_t /*num_threads*/,
}

Status BloomFilterBuilder_SingleThreaded::PushNextBatch(size_t /*thread_index*/,
int num_rows,
int64_t num_rows,
const uint32_t* hashes) {
PushNextBatchImp(num_rows, hashes);
return Status::OK();
}

Status BloomFilterBuilder_SingleThreaded::PushNextBatch(size_t /*thread_index*/,
int num_rows,
int64_t num_rows,
const uint64_t* hashes) {
PushNextBatchImp(num_rows, hashes);
return Status::OK();
}

template <typename T>
void BloomFilterBuilder_SingleThreaded::PushNextBatchImp(int num_rows, const T* hashes) {
void BloomFilterBuilder_SingleThreaded::PushNextBatchImp(int64_t num_rows,
const T* hashes) {
build_target_->Insert(hardware_flags_, num_rows, hashes);
}

Expand All @@ -340,29 +341,40 @@ Status BloomFilterBuilder_Parallel::Begin(size_t num_threads, int64_t hardware_f
log_num_prtns_ = std::min(kMaxLogNumPrtns, bit_util::Log2(num_threads));

thread_local_states_.resize(num_threads);
prtn_locks_.Init(1 << log_num_prtns_);
prtn_locks_.Init(num_threads, 1 << log_num_prtns_);

RETURN_NOT_OK(build_target->CreateEmpty(num_rows, pool));

return Status::OK();
}

Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int num_rows,
Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int64_t num_rows,
const uint32_t* hashes) {
PushNextBatchImp(thread_id, num_rows, hashes);
return Status::OK();
}

Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int num_rows,
Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int64_t num_rows,
const uint64_t* hashes) {
PushNextBatchImp(thread_id, num_rows, hashes);
return Status::OK();
}

template <typename T>
void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int num_rows,
void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int64_t num_rows,
const T* hashes) {
int num_prtns = 1 << log_num_prtns_;
// Partition IDs are calculated using the higher bits of the block ID. This
// ensures that each block is contained entirely within a partition and prevents
// concurrent access to a block.
constexpr int kLogBlocksKeptTogether = 7;
constexpr int kPrtnIdBitOffset =
BloomFilterMasks::kLogNumMasks + 6 + kLogBlocksKeptTogether;

const int log_num_prtns_max =
std::max(0, build_target_->log_num_blocks() - kLogBlocksKeptTogether);
const int log_num_prtns_mod = std::min(log_num_prtns_, log_num_prtns_max);
int num_prtns = 1 << log_num_prtns_mod;

ThreadLocalState& local_state = thread_local_states_[thread_id];
local_state.partition_ranges.resize(num_prtns + 1);
local_state.partitioned_hashes_64.resize(num_rows);
Expand All @@ -373,13 +385,10 @@ void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int num_row

PartitionSort::Eval(
num_rows, num_prtns, partition_ranges,
[hashes, num_prtns](int row_id) {
constexpr int kLogBlocksKeptTogether = 7;
constexpr int kPrtnIdBitOffset =
BloomFilterMasks::kLogNumMasks + 6 + kLogBlocksKeptTogether;
[=](int64_t row_id) {
return (hashes[row_id] >> (kPrtnIdBitOffset)) & (num_prtns - 1);
},
[hashes, partitioned_hashes](int row_id, int output_pos) {
[=](int64_t row_id, int output_pos) {
partitioned_hashes[output_pos] = hashes[row_id];
});

Expand All @@ -393,7 +402,7 @@ void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int num_row
while (num_unprocessed_partitions > 0) {
int locked_prtn_id;
int locked_prtn_id_pos;
prtn_locks_.AcquirePartitionLock(num_unprocessed_partitions,
prtn_locks_.AcquirePartitionLock(thread_id, num_unprocessed_partitions,
unprocessed_partition_ids,
/*limit_retries=*/false, /*max_retries=*/-1,
&locked_prtn_id, &locked_prtn_id_pos);
Expand Down
22 changes: 12 additions & 10 deletions cpp/src/arrow/compute/exec/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,49 +261,51 @@ class ARROW_EXPORT BloomFilterBuilder {
int64_t num_rows, int64_t num_batches,
BlockedBloomFilter* build_target) = 0;
virtual int64_t num_tasks() const { return 0; }
virtual Status PushNextBatch(size_t thread_index, int num_rows,
virtual Status PushNextBatch(size_t thread_index, int64_t num_rows,
const uint32_t* hashes) = 0;
virtual Status PushNextBatch(size_t thread_index, int num_rows,
virtual Status PushNextBatch(size_t thread_index, int64_t num_rows,
const uint64_t* hashes) = 0;
virtual void CleanUp() {}
static std::unique_ptr<BloomFilterBuilder> Make(BloomFilterBuildStrategy strategy);
};

class BloomFilterBuilder_SingleThreaded : public BloomFilterBuilder {
class ARROW_EXPORT BloomFilterBuilder_SingleThreaded : public BloomFilterBuilder {
public:
Status Begin(size_t num_threads, int64_t hardware_flags, MemoryPool* pool,
int64_t num_rows, int64_t num_batches,
BlockedBloomFilter* build_target) override;

Status PushNextBatch(size_t /*thread_index*/, int num_rows,
Status PushNextBatch(size_t /*thread_index*/, int64_t num_rows,
const uint32_t* hashes) override;

Status PushNextBatch(size_t /*thread_index*/, int num_rows,
Status PushNextBatch(size_t /*thread_index*/, int64_t num_rows,
const uint64_t* hashes) override;

private:
template <typename T>
void PushNextBatchImp(int num_rows, const T* hashes);
void PushNextBatchImp(int64_t num_rows, const T* hashes);

int64_t hardware_flags_;
BlockedBloomFilter* build_target_;
};

class BloomFilterBuilder_Parallel : public BloomFilterBuilder {
class ARROW_EXPORT BloomFilterBuilder_Parallel : public BloomFilterBuilder {
public:
Status Begin(size_t num_threads, int64_t hardware_flags, MemoryPool* pool,
int64_t num_rows, int64_t num_batches,
BlockedBloomFilter* build_target) override;

Status PushNextBatch(size_t thread_id, int num_rows, const uint32_t* hashes) override;
Status PushNextBatch(size_t thread_id, int64_t num_rows,
const uint32_t* hashes) override;

Status PushNextBatch(size_t thread_id, int num_rows, const uint64_t* hashes) override;
Status PushNextBatch(size_t thread_id, int64_t num_rows,
const uint64_t* hashes) override;

void CleanUp() override;

private:
template <typename T>
void PushNextBatchImp(size_t thread_id, int num_rows, const T* hashes);
void PushNextBatchImp(size_t thread_id, int64_t num_rows, const T* hashes);

int64_t hardware_flags_;
BlockedBloomFilter* build_target_;
Expand Down
116 changes: 92 additions & 24 deletions cpp/src/arrow/compute/exec/bloom_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <unordered_set>
#include "arrow/compute/exec/bloom_filter.h"
#include "arrow/compute/exec/key_hash.h"
#include "arrow/compute/exec/task_util.h"
#include "arrow/compute/exec/test_util.h"
#include "arrow/compute/exec/util.h"
#include "arrow/util/bitmap_ops.h"
Expand All @@ -32,39 +33,106 @@
namespace arrow {
namespace compute {

Status BuildBloomFilter(BloomFilterBuildStrategy strategy, int64_t hardware_flags,
MemoryPool* pool, int64_t num_rows,
std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
BlockedBloomFilter* target) {
constexpr int batch_size_max = 32 * 1024;
int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);

auto builder = BloomFilterBuilder::Make(strategy);

std::vector<uint32_t> thread_local_hashes32;
std::vector<uint64_t> thread_local_hashes64;
thread_local_hashes32.resize(batch_size_max);
thread_local_hashes64.resize(batch_size_max);

RETURN_NOT_OK(builder->Begin(/*num_threads=*/1, hardware_flags, pool, num_rows,
bit_util::CeilDiv(num_rows, batch_size_max), target));

for (int64_t i = 0; i < num_batches; ++i) {
constexpr int kBatchSizeMax = 32 * 1024;
Status BuildBloomFilter_Serial(
std::unique_ptr<BloomFilterBuilder>& builder, int64_t num_rows, int64_t num_batches,
std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
BlockedBloomFilter* target) {
std::vector<uint32_t> hashes32(kBatchSizeMax);
std::vector<uint64_t> hashes64(kBatchSizeMax);
for (int64_t i = 0; i < num_batches; i++) {
size_t thread_index = 0;
int batch_size = static_cast<int>(
std::min(num_rows - i * batch_size_max, static_cast<int64_t>(batch_size_max)));
std::min(num_rows - i * kBatchSizeMax, static_cast<int64_t>(kBatchSizeMax)));
if (target->NumHashBitsUsed() > 32) {
uint64_t* hashes = thread_local_hashes64.data();
get_hash64_impl(i * batch_size_max, batch_size, hashes);
uint64_t* hashes = hashes64.data();
get_hash64_impl(i * kBatchSizeMax, batch_size, hashes);
RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
} else {
uint32_t* hashes = thread_local_hashes32.data();
get_hash32_impl(i * batch_size_max, batch_size, hashes);
uint32_t* hashes = hashes32.data();
get_hash32_impl(i * kBatchSizeMax, batch_size, hashes);
RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
}
}
return Status::OK();
}

Status BuildBloomFilter_Parallel(
std::unique_ptr<BloomFilterBuilder>& builder, size_t num_threads, int64_t num_rows,
int64_t num_batches, std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
BlockedBloomFilter* target) {
ThreadIndexer thread_indexer;
std::unique_ptr<TaskScheduler> scheduler = TaskScheduler::Make();
std::vector<std::vector<uint32_t>> thread_local_hashes32(num_threads);
std::vector<std::vector<uint64_t>> thread_local_hashes64(num_threads);
for (std::vector<uint32_t>& h : thread_local_hashes32) h.resize(kBatchSizeMax);
for (std::vector<uint64_t>& h : thread_local_hashes64) h.resize(kBatchSizeMax);

std::condition_variable cv;
std::mutex mutex;
auto group = scheduler->RegisterTaskGroup(
[&](size_t thread_index, int64_t task_id) -> Status {
int batch_size = static_cast<int>(std::min(num_rows - task_id * kBatchSizeMax,
static_cast<int64_t>(kBatchSizeMax)));
if (target->NumHashBitsUsed() > 32) {
uint64_t* hashes = thread_local_hashes64[thread_index].data();
get_hash64_impl(task_id * kBatchSizeMax, batch_size, hashes);
RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
} else {
uint32_t* hashes = thread_local_hashes32[thread_index].data();
get_hash32_impl(task_id * kBatchSizeMax, batch_size, hashes);
RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
}
return Status::OK();
},
[&](size_t thread_index) -> Status {
std::unique_lock<std::mutex> lk(mutex);
cv.notify_all();
return Status::OK();
});
scheduler->RegisterEnd();
auto tp = arrow::internal::GetCpuThreadPool();
RETURN_NOT_OK(scheduler->StartScheduling(
0,
[&](std::function<Status(size_t)> func) -> Status {
return tp->Spawn([&, func]() {
size_t tid = thread_indexer();
ARROW_DCHECK_OK(func(tid));
});
},
static_cast<int>(num_threads), false));
{
std::unique_lock<std::mutex> lk(mutex);
RETURN_NOT_OK(scheduler->StartTaskGroup(0, group, num_batches));
cv.wait(lk);
}
return Status::OK();
}

Status BuildBloomFilter(BloomFilterBuildStrategy strategy, int64_t hardware_flags,
MemoryPool* pool, int64_t num_rows,
std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
BlockedBloomFilter* target) {
int64_t num_batches = bit_util::CeilDiv(num_rows, kBatchSizeMax);

bool is_serial = strategy == BloomFilterBuildStrategy::SINGLE_THREADED;
auto builder = BloomFilterBuilder::Make(strategy);

size_t num_threads = is_serial ? 1 : arrow::GetCpuThreadPoolCapacity();
RETURN_NOT_OK(builder->Begin(num_threads, hardware_flags, pool, num_rows,
bit_util::CeilDiv(num_rows, kBatchSizeMax), target));

if (is_serial)
RETURN_NOT_OK(BuildBloomFilter_Serial(builder, num_rows, num_batches,
std::move(get_hash32_impl),
std::move(get_hash64_impl), target));
else
RETURN_NOT_OK(BuildBloomFilter_Parallel(builder, num_threads, num_rows, num_batches,
std::move(get_hash32_impl),
std::move(get_hash64_impl), target));
builder->CleanUp();

return Status::OK();
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ struct ExecPlanImpl : public ExecPlan {

// producers precede consumers
sorted_nodes_ = TopoSort();
for (ExecNode* node : sorted_nodes_) {
RETURN_NOT_OK(node->PrepareToProduce());
}

std::vector<Future<>> futures;

Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ class ARROW_EXPORT ExecNode {
// A node with multiple outputs will also need to ensure it is applying backpressure if
// any of its outputs is asking to pause

/// \brief Perform any needed initialization
///
/// This hook performs any actions in between creation of ExecPlan and the call to
/// StartProducing. An example could be Bloom filter pushdown. The order of ExecNodes
/// that executes this method is undefined, but the calls are made synchronously.
///
/// At this point a node can rely on all inputs & outputs (and the input schemas)
/// being well defined.
virtual Status PrepareToProduce() { return Status::OK(); }

/// \brief Start producing
///
/// This must only be called once. If this fails, then other lifecycle
Expand Down
Loading

0 comments on commit 0742f78

Please sign in to comment.