From 2a622460ba3c48ec9dd2b12b79e753e6214e3b7b Mon Sep 17 00:00:00 2001 From: Zuyu Zhang Date: Sun, 4 Jun 2017 23:38:25 -0700 Subject: [PATCH] Refactored the operator support for broadcast hash join. --- relational_operators/BuildHashOperator.hpp | 21 +++++++++++---- relational_operators/HashJoinOperator.cpp | 16 +++++------ relational_operators/HashJoinOperator.hpp | 31 +++++++--------------- 3 files changed, 34 insertions(+), 34 deletions(-) diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp index 634e1dda..50dbc052 100644 --- a/relational_operators/BuildHashOperator.hpp +++ b/relational_operators/BuildHashOperator.hpp @@ -76,7 +76,7 @@ class BuildHashOperator : public RelationalOperator { * @param join_key_attributes The IDs of equijoin attributes in * input_relation. * @param any_join_key_attributes_nullable If any attribute is nullable. - * @param num_partitions The number of partitions in 'input_relation'. If no + * @param num_partitions The number of partitions in 'probe_relation'. If no * partitions, it is one. * @param hash_table_index The index of the JoinHashTable in QueryContext. * The HashTable's key Type(s) should be the Type(s) of the @@ -95,6 +95,7 @@ class BuildHashOperator : public RelationalOperator { join_key_attributes_(join_key_attributes), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), num_partitions_(num_partitions), + is_broadcast_join_(num_partitions > 1u && !input_relation.hasPartitionScheme()), hash_table_index_(hash_table_index), input_relation_block_ids_(num_partitions), num_workorders_generated_(num_partitions), @@ -102,12 +103,15 @@ class BuildHashOperator : public RelationalOperator { if (input_relation_is_stored) { if (input_relation.hasPartitionScheme()) { const PartitionScheme &part_scheme = *input_relation.getPartitionScheme(); - for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { + DCHECK_EQ(part_scheme.getPartitionSchemeHeader().getNumPartitions(), num_partitions_); + for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) { input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id); } } else { - // No partition. - input_relation_block_ids_[0] = input_relation.getBlocksSnapshot(); + // Broadcast hash join if build has no partitions. + for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) { + input_relation_block_ids_[part_id] = input_relation.getBlocksSnapshot(); + } } } } @@ -136,7 +140,13 @@ class BuildHashOperator : public RelationalOperator { void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, const partition_id part_id) override { - input_relation_block_ids_[part_id].push_back(input_block_id); + if (is_broadcast_join_) { + for (partition_id probe_part_id = 0; probe_part_id < num_partitions_; ++probe_part_id) { + input_relation_block_ids_[probe_part_id].push_back(input_block_id); + } + } else { + input_relation_block_ids_[part_id].push_back(input_block_id); + } } private: @@ -153,6 +163,7 @@ class BuildHashOperator : public RelationalOperator { const std::vector join_key_attributes_; const bool any_join_key_attributes_nullable_; const std::size_t num_partitions_; + const bool is_broadcast_join_; const QueryContext::join_hash_table_id hash_table_index_; // The index is the partition id. diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp index f8acd744..77dc8793 100644 --- a/relational_operators/HashJoinOperator.cpp +++ b/relational_operators/HashJoinOperator.cpp @@ -218,7 +218,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders( return true; } - for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { const JoinHashTable &hash_table = *(query_context->getJoinHashTable(hash_table_index_, part_id)); @@ -234,7 +234,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders( started_ = true; return true; } else { - for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { const JoinHashTable &hash_table = *(query_context->getJoinHashTable(hash_table_index_, part_id)); @@ -274,7 +274,7 @@ bool HashJoinOperator::getAllOuterJoinWorkOrders( return true; } - for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { const JoinHashTable &hash_table = *(query_context->getJoinHashTable(hash_table_index_, part_id)); @@ -290,7 +290,7 @@ bool HashJoinOperator::getAllOuterJoinWorkOrders( started_ = true; return true; } else { - for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { const JoinHashTable &hash_table = *(query_context->getJoinHashTable(hash_table_index_, part_id)); @@ -340,7 +340,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrderProtos( return true; } - for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { for (const block_id probe_block_id : probe_relation_block_ids_[part_id]) { container->addWorkOrderProto( createNonOuterJoinWorkOrderProto(hash_join_type, probe_block_id, part_id), @@ -350,7 +350,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrderProtos( started_ = true; return true; } else { - for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { while (num_workorders_generated_[part_id] < probe_relation_block_ids_[part_id].size()) { container->addWorkOrderProto( createNonOuterJoinWorkOrderProto(hash_join_type, @@ -402,7 +402,7 @@ bool HashJoinOperator::getAllOuterJoinWorkOrderProtos(WorkOrderProtosContainer * return true; } - for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { for (const block_id probe_block_id : probe_relation_block_ids_[part_id]) { container->addWorkOrderProto(createOuterJoinWorkOrderProto(probe_block_id, part_id), op_index_); } @@ -410,7 +410,7 @@ bool HashJoinOperator::getAllOuterJoinWorkOrderProtos(WorkOrderProtosContainer * started_ = true; return true; } else { - for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { + for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { while (num_workorders_generated_[part_id] < probe_relation_block_ids_[part_id].size()) { container->addWorkOrderProto( createOuterJoinWorkOrderProto(probe_relation_block_ids_[part_id][num_workorders_generated_[part_id]], diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp index 8e9f2d75..6391847a 100644 --- a/relational_operators/HashJoinOperator.hpp +++ b/relational_operators/HashJoinOperator.hpp @@ -102,7 +102,7 @@ class HashJoinOperator : public RelationalOperator { * @param join_key_attributes The IDs of equijoin attributes in * probe_relation. * @param any_join_key_attributes_nullable If any attribute is nullable. - * @param build_num_partitions The number of partitions in 'build_relation'. + * @param num_partitions The number of partitions in 'probe_relation'. * If no partitions, it is one. * @param output_relation The output relation. * @param output_destination_index The index of the InsertDestination in the @@ -129,7 +129,7 @@ class HashJoinOperator : public RelationalOperator { const bool probe_relation_is_stored, const std::vector &join_key_attributes, const bool any_join_key_attributes_nullable, - const std::size_t build_num_partitions, + const std::size_t num_partitions, const CatalogRelation &output_relation, const QueryContext::insert_destination_id output_destination_index, const QueryContext::join_hash_table_id hash_table_index, @@ -143,7 +143,7 @@ class HashJoinOperator : public RelationalOperator { probe_relation_is_stored_(probe_relation_is_stored), join_key_attributes_(join_key_attributes), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), - build_num_partitions_(build_num_partitions), + num_partitions_(num_partitions), output_relation_(output_relation), output_destination_index_(output_destination_index), hash_table_index_(hash_table_index), @@ -153,8 +153,8 @@ class HashJoinOperator : public RelationalOperator { ? std::vector() : *is_selection_on_build), join_type_(join_type), - probe_relation_block_ids_(build_num_partitions), - num_workorders_generated_(build_num_partitions), + probe_relation_block_ids_(num_partitions), + num_workorders_generated_(num_partitions), started_(false) { DCHECK(join_type != JoinType::kLeftOuterJoin || (is_selection_on_build != nullptr && @@ -163,15 +163,12 @@ class HashJoinOperator : public RelationalOperator { if (probe_relation_is_stored) { if (probe_relation.hasPartitionScheme()) { const PartitionScheme &part_scheme = *probe_relation.getPartitionScheme(); - DCHECK_EQ(build_num_partitions_, part_scheme.getPartitionSchemeHeader().getNumPartitions()); - for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { + for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) { probe_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id); } } else { - // Broadcast hash join if probe has no partitions. - for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) { - probe_relation_block_ids_[part_id] = probe_relation.getBlocksSnapshot(); - } + // No partitions. + probe_relation_block_ids_[0] = probe_relation.getBlocksSnapshot(); } } } @@ -227,15 +224,7 @@ class HashJoinOperator : public RelationalOperator { void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, const partition_id part_id) override { DCHECK_EQ(probe_relation_.getID(), input_relation_id); - - if (probe_relation_.hasPartitionScheme()) { - probe_relation_block_ids_[part_id].push_back(input_block_id); - } else { - // Broadcast hash join if probe has no partitions. - for (std::size_t build_part_id = 0; build_part_id < build_num_partitions_; ++build_part_id) { - probe_relation_block_ids_[build_part_id].push_back(input_block_id); - } - } + probe_relation_block_ids_[part_id].push_back(input_block_id); } QueryContext::insert_destination_id getInsertDestinationID() const override { @@ -287,7 +276,7 @@ class HashJoinOperator : public RelationalOperator { const bool probe_relation_is_stored_; const std::vector join_key_attributes_; const bool any_join_key_attributes_nullable_; - const std::size_t build_num_partitions_; + const std::size_t num_partitions_; const CatalogRelation &output_relation_; const QueryContext::insert_destination_id output_destination_index_; const QueryContext::join_hash_table_id hash_table_index_;