Skip to content

Commit

Permalink
Fix weston comments
Browse files Browse the repository at this point in the history
  • Loading branch information
save-buffer committed May 9, 2022
1 parent 40bf623 commit ad92449
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 12 deletions.
4 changes: 2 additions & 2 deletions cpp/src/arrow/compute/exec/bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,12 @@ void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int64_t num

PartitionSort::Eval(
num_rows, num_prtns, partition_ranges,
[hashes, num_prtns, kLogBlocksKeptTogether](int64_t row_id) {
[=](int64_t row_id) {
constexpr int kPrtnIdBitOffset =
BloomFilterMasks::kLogNumMasks + 6 + kLogBlocksKeptTogether;
return (hashes[row_id] >> (kPrtnIdBitOffset)) & (num_prtns - 1);
},
[hashes, partitioned_hashes](int64_t row_id, int output_pos) {
[=](int64_t row_id, int output_pos) {
partitioned_hashes[output_pos] = hashes[row_id];
});

Expand Down
4 changes: 1 addition & 3 deletions cpp/src/arrow/compute/exec/bloom_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,9 @@ Status BuildBloomFilter_Parallel(
return Status::OK();
},
[&](size_t thread_index) -> Status {
{
std::unique_lock<std::mutex> lk(mutex);
cv.notify_all();
}
return Status::OK();
return Status::OK();
});
scheduler->RegisterEnd();
auto tp = arrow::internal::GetCpuThreadPool();
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,14 @@ class ARROW_EXPORT ExecNode {
// A node with multiple outputs will also need to ensure it is applying backpressure if
// any of its outputs is asking to pause

/// \brief Steps performed immediately before StartProducing is called
/// \brief Perform any needed initialization
///
/// This hook performs any actions in between creation of ExecPlan and the call to
/// StartProducing. An example could be Bloom filter pushdown. The order of ExecNodes
/// that executes this method is undefined, but the calls are made synchronously.
///
/// At this point a node can rely on all inputs & outputs (and the input schemas)
/// being well defined.
virtual Status PrepareToProduce() { return Status::OK(); }

/// \brief Start producing
Expand Down
7 changes: 5 additions & 2 deletions cpp/src/arrow/compute/exec/hash_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ class HashJoinBasicImpl : public HashJoinImpl {
scheduler_->Abort(std::move(pos_abort_callback));
}

// Called by a downstream node after they have constructed a bloom filter
// that this node can use to filter inputs.
Status PushBloomFilter(size_t thread_index, std::unique_ptr<BlockedBloomFilter> filter,
std::vector<int> column_map) override {
bool proceed;
Expand Down Expand Up @@ -662,7 +664,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
std::vector<uint8_t> bv(bit_vector_bytes);

RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
// Start with full selection for the current minibatch
// Start with full selection for the current batch
memset(selected.data(), 0xff, bit_vector_bytes);
for (size_t ifilter = 0; ifilter < num_expected_bloom_filters_; ifilter++) {
std::vector<Datum> keys(bloom_filter_column_maps_[ifilter].size());
Expand Down Expand Up @@ -790,6 +792,8 @@ class HashJoinBasicImpl : public HashJoinImpl {
return Status::Cancelled("Hash join cancelled");
}

right_batches_.clear();

bool proceed;
{
std::lock_guard<std::mutex> lock(left_batches_mutex_);
Expand All @@ -801,7 +805,6 @@ class HashJoinBasicImpl : public HashJoinImpl {
}
if (proceed) RETURN_NOT_OK(ProbeQueuedBatches(thread_index));

right_batches_.clear();
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/hash_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ TEST(HashJoin, Random) {
Random64Bit rng(42);
#if defined(THREAD_SANITIZER) || defined(ARROW_VALGRIND)
const int num_tests = 15;
#elsif defined(ADDRESS_SANITIZER)
#elif defined(ADDRESS_SANITIZER)
const int num_tests = 50;
#else
const int num_tests = 100;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/compute/exec/task_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace compute {
// whenever it is modified by a concurrent thread on a different CPU core.
//
template <typename T>
class ARROW_EXPORT AtomicWithPadding {
class AtomicWithPadding {
private:
static constexpr int kCacheLineSize = 64;
uint8_t padding_before[kCacheLineSize];
Expand All @@ -53,7 +53,7 @@ class ARROW_EXPORT AtomicWithPadding {
//
// Also allows for executing next pending tasks immediately using a caller thread.
//
class ARROW_EXPORT TaskScheduler {
class TaskScheduler {
public:
using TaskImpl = std::function<Status(size_t, int64_t)>;
using TaskGroupContinuationImpl = std::function<Status(size_t)>;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ class AtomicCounter {
std::atomic<bool> complete_{false};
};

class ARROW_EXPORT ThreadIndexer {
class ThreadIndexer {
public:
size_t operator()();

Expand Down

0 comments on commit ad92449

Please sign in to comment.