Skip to content

Commit

Permalink
Use Predicates with Uncorrelated Subqueries for Dynamic Pruning (#2588)
Browse files Browse the repository at this point in the history
Prunes tables using Predicates that contain uncorrelated subquery results. The information about such Predicates is added to StoredTableNodes by the ChunkPruningRule and later to GetTable operators by the LQPTranslator. Deep copies of LQPs and PQPs preserve the information.
  • Loading branch information
dey4ss committed Aug 25, 2023
1 parent 6b3db39 commit 46f13fa
Show file tree
Hide file tree
Showing 19 changed files with 774 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ set(
utils/lossless_predicate_cast.cpp
utils/lossless_predicate_cast.hpp
utils/make_bimap.hpp
utils/map_prunable_subquery_predicates.hpp
utils/meta_table_manager.cpp
utils/meta_table_manager.hpp
utils/meta_tables/abstract_meta_table.cpp
Expand Down
6 changes: 3 additions & 3 deletions src/lib/expression/lqp_subquery_expression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ bool LQPSubqueryExpression::_shallow_equals(const AbstractExpression& expression
}

size_t LQPSubqueryExpression::_shallow_hash() const {
// Return 0, thus forcing a hash collision for LQPSubqueryExpressions and triggering a full equality check.
// TODO(moritz) LQP hashing will be introduced with the JoinOrdering optimizer, until then we live with these
// collisions
// Return AbstractExpression::_shallow_hash() (i.e., 0), thus forcing a hash collision for LQPSubqueryExpressions and
// triggering a full equality check. Though we often hash entire query plans, we expect most plans to contain only few
// LQPSubqueryExpressions. Thus, these hash collisions should be fine.
return AbstractExpression::_shallow_hash();
}

Expand Down
10 changes: 9 additions & 1 deletion src/lib/logical_query_plan/abstract_lqp_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "predicate_node.hpp"
#include "update_node.hpp"
#include "utils/assert.hpp"
#include "utils/map_prunable_subquery_predicates.hpp"
#include "utils/print_utils.hpp"

namespace {
Expand Down Expand Up @@ -235,7 +236,14 @@ size_t AbstractLQPNode::output_count() const {
}

std::shared_ptr<AbstractLQPNode> AbstractLQPNode::deep_copy(LQPNodeMapping node_mapping) const {
return _deep_copy_impl(node_mapping);
const auto copy = _deep_copy_impl(node_mapping);

// StoredTableNodes can store references to PredicateNodes as prunable subquery predicates (see get_table.hpp for
// details). We must assign the copies of these PredicateNodes after copying the entire LQP (see
// map_prunable_subquery_predicates.hpp).
map_prunable_subquery_predicates(node_mapping);

return copy;
}

bool AbstractLQPNode::shallow_equals(const AbstractLQPNode& rhs, const LQPNodeMapping& node_mapping) const {
Expand Down
11 changes: 10 additions & 1 deletion src/lib/logical_query_plan/lqp_translator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,20 @@
#include "stored_table_node.hpp"
#include "union_node.hpp"
#include "update_node.hpp"
#include "utils/map_prunable_subquery_predicates.hpp"
#include "utils/pruning_utils.hpp"

namespace hyrise {

std::shared_ptr<AbstractOperator> LQPTranslator::translate_node(const std::shared_ptr<AbstractLQPNode>& node) const {
return _translate_node_recursively(node);
const auto pqp = _translate_node_recursively(node);

// StoredTableNodes can store references to PredicateNodes as prunable subquery predicates (see get_table.hpp for
// details). We must assign the TableScans translated from these PredicateNodes after translating the entire LQP (see
// map_prunable_subquery_predicates.hpp).
map_prunable_subquery_predicates(_operator_by_lqp_node);

return pqp;
}

std::shared_ptr<AbstractOperator> LQPTranslator::_translate_node_recursively(
Expand Down
57 changes: 54 additions & 3 deletions src/lib/logical_query_plan/stored_table_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,25 @@ const std::vector<ColumnID>& StoredTableNode::pruned_column_ids() const {
return _pruned_column_ids;
}

void StoredTableNode::set_prunable_subquery_predicates(
const std::vector<std::weak_ptr<AbstractLQPNode>>& predicate_nodes) {
DebugAssert(std::all_of(predicate_nodes.cbegin(), predicate_nodes.cend(),
[](const auto& node) { return node.lock() && node.lock()->type == LQPNodeType::Predicate; }),
"No PredicateNode set as prunable predicate.");
_prunable_subquery_predicates = predicate_nodes;
}

std::vector<std::shared_ptr<AbstractLQPNode>> StoredTableNode::prunable_subquery_predicates() const {
auto subquery_predicates = std::vector<std::shared_ptr<AbstractLQPNode>>{};
subquery_predicates.reserve(_prunable_subquery_predicates.size());
for (const auto& subquery_predicate_ref : _prunable_subquery_predicates) {
const auto& subquery_predicate = subquery_predicate_ref.lock();
Assert(subquery_predicate, "Referenced PredicateNode expired. LQP is invalid.");
subquery_predicates.emplace_back(subquery_predicate);
}
return subquery_predicates;
}

std::string StoredTableNode::description(const DescriptionMode /*mode*/) const {
const auto& stored_table = Hyrise::get().storage_manager.get_table(table_name);

Expand Down Expand Up @@ -193,20 +212,52 @@ size_t StoredTableNode::_on_shallow_hash() const {
for (const auto& pruned_column_id : _pruned_column_ids) {
boost::hash_combine(hash, static_cast<size_t>(pruned_column_id));
}
// We intentionally force a hash collision for StoredTableNodes with the same number of prunable subquery predicates
// even though these predicates are different. Since we assume that (i) these predicates are not often set and (ii) we
// hash LQPs often, this reduces the hash overhead, makes the code simpler, and triggers an in-depth equality check
// for the rare cases with (the same number of) prunable subquery predicates.
boost::hash_combine(hash, _prunable_subquery_predicates.size());
return hash;
}

std::shared_ptr<AbstractLQPNode> StoredTableNode::_on_shallow_copy(LQPNodeMapping& /*node_mapping*/) const {
// We cannot copy _prunable_subquery_predicated here since deep_copy() recurses into the input nodes and the
// StoredTableNodes are the first ones to be copied. Instead, AbstractLQPNode::deep_copy() sets the copied
// PredicateNodes after the entire LQP has been copied.
const auto copy = make(table_name);
copy->set_pruned_chunk_ids(_pruned_chunk_ids);
copy->set_pruned_column_ids(_pruned_column_ids);
return copy;
}

bool StoredTableNode::_on_shallow_equals(const AbstractLQPNode& rhs, const LQPNodeMapping& /*node_mapping*/) const {
bool StoredTableNode::_on_shallow_equals(const AbstractLQPNode& rhs, const LQPNodeMapping& node_mapping) const {
const auto& stored_table_node = static_cast<const StoredTableNode&>(rhs);
return table_name == stored_table_node.table_name && _pruned_chunk_ids == stored_table_node._pruned_chunk_ids &&
_pruned_column_ids == stored_table_node._pruned_column_ids;
if (table_name != stored_table_node.table_name || _pruned_chunk_ids != stored_table_node._pruned_chunk_ids ||
_pruned_column_ids != stored_table_node._pruned_column_ids) {
return false;
}

// Check equality of prunable subquery predicates. For now, the order of the predicates matters. Though this is a
// missed opportunity for LQP deduplication, we do not consider this a problem for now.
const auto& prunable_subquery_predicates = this->prunable_subquery_predicates();
const auto& rhs_prunable_subquery_predicates = stored_table_node.prunable_subquery_predicates();
const auto subquery_predicate_count = prunable_subquery_predicates.size();

if (subquery_predicate_count != rhs_prunable_subquery_predicates.size()) {
return false;
}

for (auto predicate_idx = size_t{0}; predicate_idx < subquery_predicate_count; ++predicate_idx) {
// We cannot check that the PredicateNodes are equal since this equality check recurses into the inputs und we do
// not terminate. We have to compare the predicate expressions.
if (!expressions_equal_to_expressions_in_different_lqp(
prunable_subquery_predicates[predicate_idx]->node_expressions,
rhs_prunable_subquery_predicates[predicate_idx]->node_expressions, node_mapping)) {
return false;
}
}

return true;
}

void StoredTableNode::_set_output_expressions() const {
Expand Down
9 changes: 8 additions & 1 deletion src/lib/logical_query_plan/stored_table_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ class StoredTableNode : public EnableMakeForLQPNode<StoredTableNode>, public Abs

void set_pruned_column_ids(const std::vector<ColumnID>& pruned_column_ids);
const std::vector<ColumnID>& pruned_column_ids() const;

// We cannot use predicates with uncorrelated subqueries to get pruned ChunkIDs during optimization. However, we can
// reference these predicates and keep track of them in the plan. Once we execute the plan, the subqueries might have
// already been executed, so we can use them for pruning during execution.
void set_prunable_subquery_predicates(const std::vector<std::weak_ptr<AbstractLQPNode>>& predicate_nodes);
std::vector<std::shared_ptr<AbstractLQPNode>> prunable_subquery_predicates() const;
/** @} */

std::vector<ChunkIndexStatistics> chunk_indexes_statistics() const;
Expand All @@ -55,14 +61,15 @@ class StoredTableNode : public EnableMakeForLQPNode<StoredTableNode>, public Abs
protected:
size_t _on_shallow_hash() const override;
std::shared_ptr<AbstractLQPNode> _on_shallow_copy(LQPNodeMapping& /*node_mapping*/) const override;
bool _on_shallow_equals(const AbstractLQPNode& rhs, const LQPNodeMapping& /*node_mapping*/) const override;
bool _on_shallow_equals(const AbstractLQPNode& rhs, const LQPNodeMapping& node_mapping) const override;

void _set_output_expressions() const;

private:
mutable std::optional<std::vector<std::shared_ptr<AbstractExpression>>> _output_expressions;
std::vector<ChunkID> _pruned_chunk_ids;
std::vector<ColumnID> _pruned_column_ids;
std::vector<std::weak_ptr<AbstractLQPNode>> _prunable_subquery_predicates;
};

} // namespace hyrise
10 changes: 9 additions & 1 deletion src/lib/operators/abstract_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "utils/assert.hpp"
#include "utils/format_bytes.hpp"
#include "utils/format_duration.hpp"
#include "utils/map_prunable_subquery_predicates.hpp"
#include "utils/print_utils.hpp"
#include "utils/timer.hpp"

Expand Down Expand Up @@ -199,7 +200,14 @@ std::string AbstractOperator::description(DescriptionMode /*description_mode*/)

std::shared_ptr<AbstractOperator> AbstractOperator::deep_copy() const {
auto copied_ops = std::unordered_map<const AbstractOperator*, std::shared_ptr<AbstractOperator>>{};
return deep_copy(copied_ops);
const auto copy = deep_copy(copied_ops);

// GetTable operators can store references to TableScans as prunable subquery predicates (see get_table.hpp for
// details). We must assign the copies of these TableScans after copying the entire PQP (see
// map_prunable_subquery_predicates.hpp).
map_prunable_subquery_predicates(copied_ops);

return copy;
}

std::shared_ptr<AbstractOperator> AbstractOperator::deep_copy(
Expand Down
1 change: 1 addition & 0 deletions src/lib/operators/abstract_operator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ class AbstractOperator : public std::enable_shared_from_this<AbstractOperator>,
std::unique_ptr<AbstractOperatorPerformanceData> performance_data;

protected:
friend class OperatorTaskTest;
// abstract method to actually execute the operator
// execute and get_output are split into two methods to allow for easier
// asynchronous execution
Expand Down
125 changes: 121 additions & 4 deletions src/lib/operators/get_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <sstream>
#include <unordered_set>

#include "expression/expression_functional.hpp"
#include "hyrise.hpp"
#include "logical_query_plan/predicate_node.hpp"
#include "logical_query_plan/stored_table_node.hpp"
Expand All @@ -14,6 +15,8 @@

namespace hyrise {

using namespace expression_functional; // NOLINT(build/namespaces)

GetTable::GetTable(const std::string& name) : GetTable{name, {}, {}} {}

GetTable::GetTable(const std::string& name, const std::vector<ChunkID>& pruned_chunk_ids,
Expand Down Expand Up @@ -47,7 +50,16 @@ std::string GetTable::description(DescriptionMode description_mode) const {
stream << AbstractOperator::description(description_mode) << separator;
stream << "(" << table_name() << ")" << separator;
stream << "pruned:" << separator;
stream << _pruned_chunk_ids.size() << "/" << stored_table->chunk_count() << " chunk(s)";
auto overall_pruned_chunk_ids = _dynamically_pruned_chunk_ids;
overall_pruned_chunk_ids.insert(_pruned_chunk_ids.begin(), _pruned_chunk_ids.end());
const auto overall_pruned_chunk_count = overall_pruned_chunk_ids.size();
const auto dynamically_pruned_chunk_count = overall_pruned_chunk_count - _pruned_chunk_ids.size();

stream << overall_pruned_chunk_count << "/" << stored_table->chunk_count() << " chunk(s)";
if (overall_pruned_chunk_count > 0) {
stream << " (" << _pruned_chunk_ids.size() << " static, " << dynamically_pruned_chunk_count << " dynamic)";
}

if (description_mode == DescriptionMode::SingleLine) {
stream << ",";
}
Expand All @@ -69,10 +81,33 @@ const std::vector<ColumnID>& GetTable::pruned_column_ids() const {
return _pruned_column_ids;
}

void GetTable::set_prunable_subquery_predicates(
const std::vector<std::weak_ptr<const AbstractOperator>>& subquery_scans) const {
DebugAssert(std::all_of(subquery_scans.cbegin(), subquery_scans.cend(),
[](const auto& op) { return op.lock() && op.lock()->type() == OperatorType::TableScan; }),
"No TableScan set as prunable predicate.");

_prunable_subquery_scans = subquery_scans;
}

std::vector<std::shared_ptr<const AbstractOperator>> GetTable::prunable_subquery_predicates() const {
auto subquery_scans = std::vector<std::shared_ptr<const AbstractOperator>>{};
subquery_scans.reserve(_prunable_subquery_scans.size());
for (const auto& subquery_scan_ref : _prunable_subquery_scans) {
const auto& subquery_scan = subquery_scan_ref.lock();
Assert(subquery_scan, "Referenced TableScan expired. PQP is invalid.");
subquery_scans.emplace_back(subquery_scan);
}
return subquery_scans;
}

std::shared_ptr<AbstractOperator> GetTable::_on_deep_copy(
const std::shared_ptr<AbstractOperator>& /*copied_left_input*/,
const std::shared_ptr<AbstractOperator>& /*copied_right_input*/,
std::unordered_map<const AbstractOperator*, std::shared_ptr<AbstractOperator>>& /*copied_ops*/) const {
// We cannot copy _prunable_subquery_scans here since deep_copy() recurses into the input operators and the GetTable
// operators are the first ones to be copied. Instead, AbstractOperator::deep_copy() sets the copied TableScans after
// the whole PQP has been copied.
return std::make_shared<GetTable>(_name, _pruned_chunk_ids, _pruned_column_ids);
}

Expand Down Expand Up @@ -103,11 +138,13 @@ std::shared_ptr<const Table> GetTable::_on_execute() {
// Currently, value_clustered_by is only used for temporary tables. If tables in the StorageManager start using that
// flag, too, it needs to be forwarded here; otherwise it would be completely invisible in the PQP.
DebugAssert(stored_table->value_clustered_by().empty(), "GetTable does not forward value_clustered_by");
auto overall_pruned_chunk_ids = _prune_chunks_dynamically();
overall_pruned_chunk_ids.insert(_pruned_chunk_ids.cbegin(), _pruned_chunk_ids.cend());
auto pruned_chunk_ids_iter = overall_pruned_chunk_ids.begin();
auto excluded_chunk_ids = std::vector<ChunkID>{};
auto pruned_chunk_ids_iter = _pruned_chunk_ids.begin();
for (ChunkID stored_chunk_id{0}; stored_chunk_id < chunk_count; ++stored_chunk_id) {
for (auto stored_chunk_id = ChunkID{0}; stored_chunk_id < chunk_count; ++stored_chunk_id) {
// Check whether the Chunk is pruned
if (pruned_chunk_ids_iter != _pruned_chunk_ids.end() && *pruned_chunk_ids_iter == stored_chunk_id) {
if (pruned_chunk_ids_iter != overall_pruned_chunk_ids.end() && *pruned_chunk_ids_iter == stored_chunk_id) {
++pruned_chunk_ids_iter;
excluded_chunk_ids.emplace_back(stored_chunk_id);
continue;
Expand Down Expand Up @@ -277,4 +314,84 @@ std::shared_ptr<const Table> GetTable::_on_execute() {
stored_table->uses_mvcc(), table_indexes);
}

std::set<ChunkID> GetTable::_prune_chunks_dynamically() {
if (_prunable_subquery_scans.empty()) {
return {};
}

// Create a dummy PredicateNode for each predicate containing a subquery that has already been executed. We do not use
// the original predicate to ignore all other nodes between the StoredTableNode and the PredicateNodes. Since the
// ChunkPruningRule already took care to add only predicates that are safe to prune with, we can act as if there were
// no other LQP nodes.
auto prunable_predicate_nodes = std::vector<std::shared_ptr<PredicateNode>>{};
prunable_predicate_nodes.reserve(_prunable_subquery_scans.size());

// Create a dummy StoredTableNode from the table to retrieve. `compute_chunk_exclude_list` modifies the node's
// statistics and we want to avoid that. We cannot use `deep_copy()` here since it would complain that the referenced
// prunable PredicateNodes are not part of the LQP.
const auto& stored_table_node = static_cast<const StoredTableNode&>(*lqp_node);
const auto dummy_stored_table_node = StoredTableNode::make(_name);

for (const auto& op : prunable_subquery_predicates()) {
const auto& table_scan = static_cast<const TableScan&>(*op);
const auto& operator_predicate_arguments = table_scan.predicate()->arguments;
const auto& predicate_node = static_cast<const PredicateNode&>(*table_scan.lqp_node);
const auto adjusted_predicate = predicate_node.predicate()->deep_copy();
auto& arguments = adjusted_predicate->arguments;
const auto argument_count = adjusted_predicate->arguments.size();

// Adjust predicates with the dummy StoredTableNode and the subquery result, if available.
for (auto expression_idx = size_t{0}; expression_idx < argument_count; ++expression_idx) {
auto& argument = arguments[expression_idx];
// Replace any column with the respective column from our dummy StoredTableNode.
if (const auto lqp_column = std::dynamic_pointer_cast<LQPColumnExpression>(argument)) {
Assert(*lqp_column->original_node.lock() == stored_table_node,
"Predicate is performed on wrong StoredTableNode.");
argument = lqp_column_(dummy_stored_table_node, lqp_column->original_column_id);
continue;
}

// Check if expression is an uncorrelated subquery.
if (argument->type != ExpressionType::LQPSubquery) {
continue;
}
Assert(operator_predicate_arguments[expression_idx]->type == ExpressionType::PQPSubquery,
"Cannot resolve PQPSubqueryExpression.");
const auto& subquery = static_cast<PQPSubqueryExpression&>(*operator_predicate_arguments[expression_idx]);
if (subquery.is_correlated()) {
continue;
}

// It might happen that scheduling the subquery before the GetTable operator would create a cycle. For instance,
// this can happen for a query like this: SELECT * FROM a_table WHERE x > (SELECT AVG(x) FROM a_table);
// The PQP of the query could look like the following:
//
// [TableScan] x > SUBQUERY
// | *
// | * uncorrelated subquery
// | *
// | [AggregateHash] AVG(x)
// | /
// [GetTable] a_table
//
// We cannot schedule the AggregateHash operator before the GetTable operator to obtain the subquery result for
// pruning: the OperatorTasks wrapping both operators would be in a circular wait for each other. We simply avoid
// this circular wait by StoredTableNodes using their prunable_subquery_predicates for equality checks. Thus, the
// LQPTranslator creates two GetTable operators rather than deduplicating them. resolve_uncorrelated_subquery()
// asserts that the subquery has already been executed.
argument = value_(resolve_uncorrelated_subquery(subquery.pqp));
}

// Add a new PredicateNode to the pruning chain.
auto input_node = static_pointer_cast<AbstractLQPNode>(dummy_stored_table_node);
if (!prunable_predicate_nodes.empty()) {
input_node = prunable_predicate_nodes.back();
}
prunable_predicate_nodes.emplace_back(PredicateNode::make(adjusted_predicate, input_node));
}

_dynamically_pruned_chunk_ids = compute_chunk_exclude_list(prunable_predicate_nodes, dummy_stored_table_node);
return _dynamically_pruned_chunk_ids;
}

} // namespace hyrise
Loading

0 comments on commit 46f13fa

Please sign in to comment.