From d9ef519f458fc5989fd15f0af49f069c34110c35 Mon Sep 17 00:00:00 2001 From: michalursa Date: Tue, 19 Oct 2021 13:46:59 +0900 Subject: [PATCH] ARROW-14211: [C++][Compute] Fixing thread sanitizer problems in hash join node Fixing 3 issues: - one in SchemaProjectionMaps - I simplified all of the code to get rid of thread synchronization at all - one in TaskScheduler - added (unnecessary) mutex - one in HashJoinImpl - switching from shared byte vector to local bit vectors and merge (for recording if a match for a hash table row has been seen) Closes #11350 from michalursa/ARROW-14211-hash-join-tsan Authored-by: michalursa Signed-off-by: Sutou Kouhei --- cpp/src/arrow/compute/exec/hash_join.cc | 104 ++++++++++++------ cpp/src/arrow/compute/exec/hash_join_node.cc | 6 +- .../arrow/compute/exec/hash_join_node_test.cc | 33 ++++-- cpp/src/arrow/compute/exec/schema_util.h | 93 ++++++++++------ cpp/src/arrow/compute/exec/task_util.cc | 7 +- cpp/src/arrow/compute/exec/util_test.cc | 6 +- 6 files changed, 160 insertions(+), 89 deletions(-) diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 9500beb666a38..8bbd81824510e 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -33,6 +33,9 @@ namespace compute { using internal::RowEncoder; class HashJoinBasicImpl : public HashJoinImpl { + private: + struct ThreadLocalState; + public: Status InputReceived(size_t thread_index, int side, ExecBatch batch) override { if (cancelled_) { @@ -91,6 +94,7 @@ class HashJoinBasicImpl : public HashJoinImpl { local_states_.resize(num_threads); for (size_t i = 0; i < local_states_.size(); ++i) { local_states_[i].is_initialized = false; + local_states_[i].is_has_match_initialized = false; } has_hash_table_ = false; @@ -150,16 +154,16 @@ class HashJoinBasicImpl : public HashJoinImpl { int num_cols = schema_mgr_->proj_maps[side].num_cols(projection_handle); projected.values.resize(num_cols); - const int* to_input = + auto to_input = schema_mgr_->proj_maps[side].map(projection_handle, HashJoinProjection::INPUT); for (int icol = 0; icol < num_cols; ++icol) { - projected.values[icol] = batch.values[to_input[icol]]; + projected.values[icol] = batch.values[to_input.get(icol)]; } return encoder->EncodeAndAppend(projected); } - void ProbeBatch_Lookup(const RowEncoder& exec_batch_keys, + void ProbeBatch_Lookup(ThreadLocalState* local_state, const RowEncoder& exec_batch_keys, const std::vector& non_null_bit_vectors, const std::vector& non_null_bit_vector_offsets, std::vector* output_match, @@ -167,6 +171,9 @@ class HashJoinBasicImpl : public HashJoinImpl { std::vector* output_match_left, std::vector* output_match_right) { ARROW_DCHECK(has_hash_table_); + + InitHasMatchIfNeeded(local_state); + int num_cols = static_cast(non_null_bit_vectors.size()); for (int32_t irow = 0; irow < exec_batch_keys.num_rows(); ++irow) { // Apply null key filtering @@ -191,7 +198,8 @@ class HashJoinBasicImpl : public HashJoinImpl { for (auto it = range.first; it != range.second; ++it) { output_match_left->push_back(irow); output_match_right->push_back(it->second); - has_match_[it->second] = 0xFF; + // Mark row in hash table as having a match + BitUtil::SetBit(local_state->has_match.data(), it->second); has_match = true; } if (!has_match) { @@ -215,46 +223,47 @@ class HashJoinBasicImpl : public HashJoinImpl { ARROW_DCHECK((opt_right_payload == nullptr) == (schema_mgr_->proj_maps[1].num_cols(HashJoinProjection::PAYLOAD) == 0)); result.values.resize(num_out_cols_left + num_out_cols_right); - const int* from_key = schema_mgr_->proj_maps[0].map(HashJoinProjection::OUTPUT, - HashJoinProjection::KEY); - const int* from_payload = schema_mgr_->proj_maps[0].map(HashJoinProjection::OUTPUT, - HashJoinProjection::PAYLOAD); + auto from_key = schema_mgr_->proj_maps[0].map(HashJoinProjection::OUTPUT, + HashJoinProjection::KEY); + auto from_payload = schema_mgr_->proj_maps[0].map(HashJoinProjection::OUTPUT, + HashJoinProjection::PAYLOAD); for (int icol = 0; icol < num_out_cols_left; ++icol) { - bool is_from_key = (from_key[icol] != HashJoinSchema::kMissingField()); - bool is_from_payload = (from_payload[icol] != HashJoinSchema::kMissingField()); + bool is_from_key = (from_key.get(icol) != HashJoinSchema::kMissingField()); + bool is_from_payload = (from_payload.get(icol) != HashJoinSchema::kMissingField()); ARROW_DCHECK(is_from_key != is_from_payload); ARROW_DCHECK(!is_from_key || (opt_left_key && - from_key[icol] < static_cast(opt_left_key->values.size()) && + from_key.get(icol) < static_cast(opt_left_key->values.size()) && opt_left_key->length == batch_size_next)); ARROW_DCHECK( !is_from_payload || (opt_left_payload && - from_payload[icol] < static_cast(opt_left_payload->values.size()) && + from_payload.get(icol) < static_cast(opt_left_payload->values.size()) && opt_left_payload->length == batch_size_next)); - result.values[icol] = is_from_key ? opt_left_key->values[from_key[icol]] - : opt_left_payload->values[from_payload[icol]]; + result.values[icol] = is_from_key + ? opt_left_key->values[from_key.get(icol)] + : opt_left_payload->values[from_payload.get(icol)]; } from_key = schema_mgr_->proj_maps[1].map(HashJoinProjection::OUTPUT, HashJoinProjection::KEY); from_payload = schema_mgr_->proj_maps[1].map(HashJoinProjection::OUTPUT, HashJoinProjection::PAYLOAD); for (int icol = 0; icol < num_out_cols_right; ++icol) { - bool is_from_key = (from_key[icol] != HashJoinSchema::kMissingField()); - bool is_from_payload = (from_payload[icol] != HashJoinSchema::kMissingField()); + bool is_from_key = (from_key.get(icol) != HashJoinSchema::kMissingField()); + bool is_from_payload = (from_payload.get(icol) != HashJoinSchema::kMissingField()); ARROW_DCHECK(is_from_key != is_from_payload); ARROW_DCHECK(!is_from_key || (opt_right_key && - from_key[icol] < static_cast(opt_right_key->values.size()) && + from_key.get(icol) < static_cast(opt_right_key->values.size()) && opt_right_key->length == batch_size_next)); ARROW_DCHECK( !is_from_payload || (opt_right_payload && - from_payload[icol] < static_cast(opt_right_payload->values.size()) && + from_payload.get(icol) < static_cast(opt_right_payload->values.size()) && opt_right_payload->length == batch_size_next)); result.values[num_out_cols_left + icol] = - is_from_key ? opt_right_key->values[from_key[icol]] - : opt_right_payload->values[from_payload[icol]]; + is_from_key ? opt_right_key->values[from_key.get(icol)] + : opt_right_payload->values[from_payload.get(icol)]; } output_batch_callback_(std::move(result)); @@ -384,10 +393,10 @@ class HashJoinBasicImpl : public HashJoinImpl { int num_key_cols = schema_mgr_->proj_maps[0].num_cols(HashJoinProjection::KEY); non_null_bit_vectors.resize(num_key_cols); non_null_bit_vector_offsets.resize(num_key_cols); - const int* from_batch = + auto from_batch = schema_mgr_->proj_maps[0].map(HashJoinProjection::KEY, HashJoinProjection::INPUT); for (int i = 0; i < num_key_cols; ++i) { - int input_col_id = from_batch[i]; + int input_col_id = from_batch.get(i); const uint8_t* non_nulls = nullptr; int64_t offset = 0; if (batch[input_col_id].array()->buffers[0] != NULLPTR) { @@ -398,7 +407,7 @@ class HashJoinBasicImpl : public HashJoinImpl { non_null_bit_vector_offsets[i] = offset; } - ProbeBatch_Lookup(local_state.exec_batch_keys, non_null_bit_vectors, + ProbeBatch_Lookup(&local_state, local_state.exec_batch_keys, non_null_bit_vectors, non_null_bit_vector_offsets, &local_state.match, &local_state.no_match, &local_state.match_left, &local_state.match_right); @@ -446,11 +455,6 @@ class HashJoinBasicImpl : public HashJoinImpl { hash_table_.insert(std::make_pair(hash_table_keys_.encoded_row(irow), irow)); } } - if (!hash_table_empty_) { - int32_t num_rows = hash_table_keys_.num_rows(); - has_match_.resize(num_rows); - memset(has_match_.data(), 0, num_rows); - } } return Status::OK(); } @@ -563,9 +567,9 @@ class HashJoinBasicImpl : public HashJoinImpl { id_right.clear(); bool use_left = false; - uint8_t match_search_value = (join_type_ == JoinType::RIGHT_SEMI) ? 0xFF : 0x00; + bool match_search_value = (join_type_ == JoinType::RIGHT_SEMI); for (int32_t row_id = start_row_id; row_id < end_row_id; ++row_id) { - if (has_match_[row_id] == match_search_value) { + if (BitUtil::GetBit(has_match_.data(), row_id) == match_search_value) { id_right.push_back(row_id); } } @@ -607,16 +611,13 @@ class HashJoinBasicImpl : public HashJoinImpl { } Status ScanHashTable(size_t thread_index) { + MergeHasMatch(); return scheduler_->StartTaskGroup(thread_index, task_group_scan_, ScanHashTable_num_tasks()); } bool QueueBatchIfNeeded(int side, ExecBatch batch) { if (side == 0) { - if (has_hash_table_) { - return false; - } - std::lock_guard lock(left_batches_mutex_); if (has_hash_table_) { return false; @@ -636,6 +637,39 @@ class HashJoinBasicImpl : public HashJoinImpl { return ScanHashTable(thread_index); } + void InitHasMatchIfNeeded(ThreadLocalState* local_state) { + if (local_state->is_has_match_initialized) { + return; + } + if (!hash_table_empty_) { + int32_t num_rows = hash_table_keys_.num_rows(); + local_state->has_match.resize(BitUtil::BytesForBits(num_rows)); + memset(local_state->has_match.data(), 0, BitUtil::BytesForBits(num_rows)); + } + local_state->is_has_match_initialized = true; + } + + void MergeHasMatch() { + if (hash_table_empty_) { + return; + } + + int32_t num_rows = hash_table_keys_.num_rows(); + has_match_.resize(BitUtil::BytesForBits(num_rows)); + memset(has_match_.data(), 0, BitUtil::BytesForBits(num_rows)); + + for (size_t tid = 0; tid < local_states_.size(); ++tid) { + if (!local_states_[tid].is_initialized) { + continue; + } + if (!local_states_[tid].is_has_match_initialized) { + continue; + } + arrow::internal::BitmapOr(has_match_.data(), 0, local_states_[tid].has_match.data(), + 0, num_rows, 0, has_match_.data()); + } + } + static constexpr int64_t hash_table_scan_unit_ = 32 * 1024; static constexpr int64_t output_batch_size_ = 32 * 1024; @@ -666,6 +700,8 @@ class HashJoinBasicImpl : public HashJoinImpl { std::vector no_match; std::vector match_left; std::vector match_right; + bool is_has_match_initialized; + std::vector has_match; }; std::vector local_states_; diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index ff87aa47fae7a..3e02054fbedf1 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -253,9 +253,9 @@ std::shared_ptr HashJoinSchema::MakeOutputSchema( for (int i = 0; i < left_size + right_size; ++i) { bool is_left = (i < left_size); int side = (is_left ? 0 : 1); - int input_field_id = - proj_maps[side].map(HashJoinProjection::OUTPUT, - HashJoinProjection::INPUT)[is_left ? i : i - left_size]; + int input_field_id = proj_maps[side] + .map(HashJoinProjection::OUTPUT, HashJoinProjection::INPUT) + .get(is_left ? i : i - left_size); const std::string& input_field_name = proj_maps[side].field_name(HashJoinProjection::INPUT, input_field_id); const std::shared_ptr& input_data_type = diff --git a/cpp/src/arrow/compute/exec/hash_join_node_test.cc b/cpp/src/arrow/compute/exec/hash_join_node_test.cc index 4c1954d8ab2c3..a5410b0d37a22 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node_test.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node_test.cc @@ -921,24 +921,31 @@ void HashJoinWithExecPlan(Random64Bit& rng, bool parallel, ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get())); - Declaration join{"hashjoin", join_options}; - // add left source BatchesWithSchema l_batches = TableToBatches(rng, num_batches_l, l, "l_"); - join.inputs.emplace_back(Declaration{ - "source", SourceNodeOptions{l_batches.schema, l_batches.gen(parallel, - /*slow=*/false)}}); + ASSERT_OK_AND_ASSIGN( + ExecNode * l_source, + MakeExecNode("source", plan.get(), {}, + SourceNodeOptions{l_batches.schema, l_batches.gen(parallel, + /*slow=*/false)})); + // add right source BatchesWithSchema r_batches = TableToBatches(rng, num_batches_r, r, "r_"); - join.inputs.emplace_back(Declaration{ - "source", SourceNodeOptions{r_batches.schema, r_batches.gen(parallel, - /*slow=*/false)}}); - AsyncGenerator> sink_gen; + ASSERT_OK_AND_ASSIGN( + ExecNode * r_source, + MakeExecNode("source", plan.get(), {}, + SourceNodeOptions{r_batches.schema, r_batches.gen(parallel, + /*slow=*/false)})); - ASSERT_OK(Declaration::Sequence({join, {"sink", SinkNodeOptions{&sink_gen}}}) - .AddToPlan(plan.get())); + ASSERT_OK_AND_ASSIGN(ExecNode * join, MakeExecNode("hashjoin", plan.get(), + {l_source, r_source}, join_options)); + + AsyncGenerator> sink_gen; + ASSERT_OK_AND_ASSIGN( + std::ignore, MakeExecNode("sink", plan.get(), {join}, SinkNodeOptions{&sink_gen})); ASSERT_FINISHES_OK_AND_ASSIGN(auto res, StartAndCollect(plan.get(), sink_gen)); + ASSERT_OK_AND_ASSIGN(*output, TableFromExecBatches(output_schema, res)); } @@ -1056,6 +1063,10 @@ TEST(HashJoin, Random) { // print num_rows, batch_size, join_type, join_cmp std::cout << join_type_name << " " << key_cmp_str << " "; key_types.Print(); + std::cout << " payload_l: "; + payload_types[0].Print(); + std::cout << " payload_r: "; + payload_types[1].Print(); std::cout << " num_rows_l = " << num_rows_l << " num_rows_r = " << num_rows_r << " batch size = " << batch_size << " parallel = " << (parallel ? "true" : "false"); diff --git a/cpp/src/arrow/compute/exec/schema_util.h b/cpp/src/arrow/compute/exec/schema_util.h index 046120714f7ff..ba14d577dc9e8 100644 --- a/cpp/src/arrow/compute/exec/schema_util.h +++ b/cpp/src/arrow/compute/exec/schema_util.h @@ -32,6 +32,18 @@ using internal::checked_cast; namespace compute { +struct SchemaProjectionMap { + static constexpr int kMissingField = -1; + int num_cols; + const int* source_to_base; + const int* base_to_target; + inline int get(int i) const { + ARROW_DCHECK(i >= 0 && i < num_cols); + ARROW_DCHECK(source_to_base[i] != kMissingField); + return base_to_target[source_to_base[i]]; + } +}; + /// Helper class for managing different projections of the same row schema. /// Used to efficiently map any field in one projection to a corresponding field in /// another projection. @@ -74,20 +86,14 @@ class SchemaProjectionMaps { return field(schema_handle, field_id).data_type; } - const int* map(ProjectionIdEnum from, ProjectionIdEnum to) { + SchemaProjectionMap map(ProjectionIdEnum from, ProjectionIdEnum to) { int id_from = schema_id(from); int id_to = schema_id(to); - int num_schemas = static_cast(schemas_.size()); - int pos = id_from * num_schemas + id_to; - const int* ptr = mapping_ptrs_[pos]; - if (!ptr) { - auto guard = mutex_.Lock(); // acquire the lock - if (!ptr) { - GenerateMap(id_from, id_to); - } - ptr = mapping_ptrs_[pos]; - } - return ptr; + SchemaProjectionMap result; + result.num_cols = num_cols(from); + result.source_to_base = mappings_[id_from].data(); + result.base_to_target = inverse_mappings_[id_to].data(); + return result; } protected: @@ -135,8 +141,12 @@ class SchemaProjectionMaps { void RegisterEnd() { size_t size = schemas_.size(); - mapping_ptrs_.resize(size * size); - mapping_bufs_.resize(size * size); + mappings_.resize(size); + inverse_mappings_.resize(size); + int id_base = 0; + for (size_t i = 0; i < size; ++i) { + GenerateMapForProjection(static_cast(i), id_base); + } } KeyEncoder::KeyColumnMetadata ColumnMetadataFromDataType( @@ -175,35 +185,46 @@ class SchemaProjectionMaps { return field_infos[field_id]; } - void GenerateMap(int id_from, int id_to) { - int num_schemas = static_cast(schemas_.size()); - int pos = id_from * num_schemas + id_to; - - int num_cols_from = static_cast(schemas_[id_from].second.size()); - int num_cols_to = static_cast(schemas_[id_to].second.size()); - mapping_bufs_[pos].resize(num_cols_from); - const std::vector& fields_from = schemas_[id_from].second; - const std::vector& fields_to = schemas_[id_to].second; - for (int i = 0; i < num_cols_from; ++i) { - int field_id = kMissingField; - for (int j = 0; j < num_cols_to; ++j) { - if (fields_from[i].field_path == fields_to[j].field_path) { - field_id = j; - // If there are multiple matches for the same input field, - // it will be mapped to the first match. - break; + void GenerateMapForProjection(int id_proj, int id_base) { + int num_cols_proj = static_cast(schemas_[id_proj].second.size()); + int num_cols_base = static_cast(schemas_[id_base].second.size()); + + std::vector& mapping = mappings_[id_proj]; + std::vector& inverse_mapping = inverse_mappings_[id_proj]; + mapping.resize(num_cols_proj); + inverse_mapping.resize(num_cols_base); + + if (id_proj == id_base) { + for (int i = 0; i < num_cols_base; ++i) { + mapping[i] = inverse_mapping[i] = i; + } + } else { + const std::vector& fields_proj = schemas_[id_proj].second; + const std::vector& fields_base = schemas_[id_base].second; + for (int i = 0; i < num_cols_base; ++i) { + inverse_mapping[i] = SchemaProjectionMap::kMissingField; + } + for (int i = 0; i < num_cols_proj; ++i) { + int field_id = SchemaProjectionMap::kMissingField; + for (int j = 0; j < num_cols_base; ++j) { + if (fields_proj[i].field_path == fields_base[j].field_path) { + field_id = j; + // If there are multiple matches for the same input field, + // it will be mapped to the first match. + break; + } } + ARROW_DCHECK(field_id != SchemaProjectionMap::kMissingField); + mapping[i] = field_id; + inverse_mapping[field_id] = i; } - mapping_bufs_[pos][i] = field_id; } - mapping_ptrs_[pos] = mapping_bufs_[pos].data(); } - std::vector mapping_ptrs_; - std::vector> mapping_bufs_; // vector used as a mapping from ProjectionIdEnum to fields std::vector>> schemas_; - util::Mutex mutex_; + std::vector> mappings_; + std::vector> inverse_mappings_; }; } // namespace compute diff --git a/cpp/src/arrow/compute/exec/task_util.cc b/cpp/src/arrow/compute/exec/task_util.cc index 5693400ae915e..e5e714d34ab0f 100644 --- a/cpp/src/arrow/compute/exec/task_util.cc +++ b/cpp/src/arrow/compute/exec/task_util.cc @@ -163,8 +163,11 @@ std::vector> TaskSchedulerImpl::PickTasks(int num_tasks, int task_group_id = static_cast((start_task_group + i) % (task_groups_.size())); TaskGroup& task_group = task_groups_[task_group_id]; - if (task_group.state_ != TaskGroupState::READY) { - continue; + { + std::lock_guard lock(mutex_); + if (task_group.state_ != TaskGroupState::READY) { + continue; + } } int num_tasks_remaining = num_tasks - static_cast(result.size()); diff --git a/cpp/src/arrow/compute/exec/util_test.cc b/cpp/src/arrow/compute/exec/util_test.cc index 9659fb2e9de92..7acf8228d6a08 100644 --- a/cpp/src/arrow/compute/exec/util_test.cc +++ b/cpp/src/arrow/compute/exec/util_test.cc @@ -45,7 +45,7 @@ TEST(FieldMap, Trivial) { auto i = schema_mgr.proj_maps[0].map(HashJoinProjection::INPUT, HashJoinProjection::OUTPUT); - EXPECT_EQ(i[0], 0); + EXPECT_EQ(i.get(0), 0); } TEST(FieldMap, TrivialDuplicates) { @@ -64,7 +64,7 @@ TEST(FieldMap, TrivialDuplicates) { auto i = schema_mgr.proj_maps[0].map(HashJoinProjection::INPUT, HashJoinProjection::OUTPUT); - EXPECT_EQ(i[0], 0); + EXPECT_EQ(i.get(0), 0); } TEST(FieldMap, SingleKeyField) { @@ -93,7 +93,7 @@ TEST(FieldMap, SingleKeyField) { auto i = schema_mgr.proj_maps[0].map(HashJoinProjection::INPUT, HashJoinProjection::OUTPUT); - EXPECT_EQ(i[0], 0); + EXPECT_EQ(i.get(0), 0); } TEST(FieldMap, TwoKeyFields) {