Skip to content

Commit

Permalink
Generate In-Clause filters from hash joins (#14864)
Browse files Browse the repository at this point in the history
Follow-up from #12908


This PR extends the join filters that get generated by hash joins to
include an `IN` filter when the hash table is small enough. Rather than
generating only a `min-max` filter, we generate an `IN` filter with
**all** values in the hash table. This can greatly improve performance
over the `min-max` filter when there are few values that are far apart.
For example, if the hash table contains the values `1` and `100000`, the
`IN` filter might be much more effective as we can prune many more row
groups.

The threshold for which we generate an `IN` filter is determined by the
`dynamic_or_filter_threshold` setting, and defaults to 50 rows. The `IN`
filter is pushed as an `OPTIONAL_FILTER`, which is currently only
evaluated for zone-map pruning. As a result, the performance impact of
pushing this filter is minimal, while the performance improvement from
extra zone-map pruning can be significant.

### Benchmark

Below is a benchmark that we run over TPC-H SF10 in which we run a join
that is generated through an `IN` clause - we join on the `min` and
`max` values of `l_orderkey` (meaning the generated min/max filters will
not be effective in pruning rows).

```sql
SELECT *
FROM lineitem
WHERE l_orderkey IN (
    SELECT UNNEST([MIN(l_orderkey), MAX(l_orderkey)])
    FROM lineitem)
ORDER BY ALL;
```

| v1.1  |  New  |
|-------|-------|
| 0.22s | 0.04s |
  • Loading branch information
Mytherin authored Nov 18, 2024
2 parents 339435d + a6659f1 commit b470dea
Show file tree
Hide file tree
Showing 31 changed files with 476 additions and 186 deletions.
27 changes: 27 additions & 0 deletions benchmark/tpch/join/join_or_filter_pushdown.benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# name: benchmark/tpch/join/join_or_filter_pushdown.benchmark
# description: Join filter pushdown
# group: [join]

name Join Or Filter Pushdown
group join
subgroup tpch

require tpch

cache tpch_sf1.duckdb

load
CALL dbgen(sf=1);

run
SELECT * from lineitem WHERE l_orderkey IN (SELECT UNNEST([MIN(l_orderkey), MAX(l_orderkey)]) FROM lineitem) ORDER BY ALL

result IIIIIIIIIIIIIIII
1 2132 4633 4 28.00 28955.64 0.09 0.06 N O 1996-04-21 1996-03-30 1996-05-16 NONE AIR s cajole busily above t
1 15635 638 6 32.00 49620.16 0.07 0.02 N O 1996-01-30 1996-02-07 1996-02-03 DELIVER IN PERSON MAIL rouches. special
1 24027 1534 5 24.00 22824.48 0.10 0.04 N O 1996-03-30 1996-03-14 1996-04-01 NONE FOB the regular, regular pa
1 63700 3701 3 8.00 13309.60 0.10 0.02 N O 1996-01-29 1996-03-05 1996-01-31 TAKE BACK RETURN REG AIR ourts cajole above the furiou
1 67310 7311 2 36.00 45983.16 0.09 0.06 N O 1996-04-12 1996-02-28 1996-04-20 TAKE BACK RETURN MAIL according to the final foxes. qui
1 155190 7706 1 17.00 21168.23 0.04 0.02 N O 1996-03-13 1996-02-12 1996-03-22 DELIVER IN PERSON TRUCK to beans x-ray carefull
6000000 32255 2256 1 5.00 5936.25 0.04 0.03 N O 1996-11-02 1996-11-19 1996-12-01 TAKE BACK RETURN MAIL riously pe
6000000 96127 6128 2 28.00 31447.36 0.01 0.02 N O 1996-09-22 1996-10-01 1996-10-21 NONE AIR pecial excuses nag evenly f
18 changes: 18 additions & 0 deletions benchmark/tpch/join/join_or_filter_range.benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# name: benchmark/tpch/join/join_or_filter_range.benchmark
# description: Join filter pushdown
# group: [join]

name Join Or Filter Pushdown
group join
subgroup tpch

require tpch

cache tpch_sf1.duckdb

load
CALL dbgen(sf=1);

run
SELECT * from lineitem WHERE l_orderkey IN (SELECT * FROM range(50)) ORDER BY ALL

7 changes: 4 additions & 3 deletions src/common/enum_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3581,19 +3581,20 @@ const StringUtil::EnumStringLiteral *GetTableFilterTypeValues() {
{ static_cast<uint32_t>(TableFilterType::CONJUNCTION_OR), "CONJUNCTION_OR" },
{ static_cast<uint32_t>(TableFilterType::CONJUNCTION_AND), "CONJUNCTION_AND" },
{ static_cast<uint32_t>(TableFilterType::STRUCT_EXTRACT), "STRUCT_EXTRACT" },
{ static_cast<uint32_t>(TableFilterType::OPTIONAL_FILTER), "OPTIONAL_FILTER" }
{ static_cast<uint32_t>(TableFilterType::OPTIONAL_FILTER), "OPTIONAL_FILTER" },
{ static_cast<uint32_t>(TableFilterType::IN_FILTER), "IN_FILTER" }
};
return values;
}

template<>
const char* EnumUtil::ToChars<TableFilterType>(TableFilterType value) {
return StringUtil::EnumToString(GetTableFilterTypeValues(), 7, "TableFilterType", static_cast<uint32_t>(value));
return StringUtil::EnumToString(GetTableFilterTypeValues(), 8, "TableFilterType", static_cast<uint32_t>(value));
}

template<>
TableFilterType EnumUtil::FromString<TableFilterType>(const char *value) {
return static_cast<TableFilterType>(StringUtil::StringToEnum(GetTableFilterTypeValues(), 7, "TableFilterType", value));
return static_cast<TableFilterType>(StringUtil::StringToEnum(GetTableFilterTypeValues(), 8, "TableFilterType", value));
}

const StringUtil::EnumStringLiteral *GetTablePartitionInfoValues() {
Expand Down
57 changes: 0 additions & 57 deletions src/common/row_operations/row_gather.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,61 +178,4 @@ void RowOperations::Gather(Vector &rows, const SelectionVector &row_sel, Vector
}
}

template <class T>
static void TemplatedFullScanLoop(Vector &rows, Vector &col, idx_t count, idx_t col_offset, idx_t col_no,
idx_t column_count) {
// Precompute mask indexes
idx_t entry_idx;
idx_t idx_in_entry;
ValidityBytes::GetEntryIndex(col_no, entry_idx, idx_in_entry);

auto ptrs = FlatVector::GetData<data_ptr_t>(rows);
auto data = FlatVector::GetData<T>(col);
// auto &col_mask = FlatVector::Validity(col);

for (idx_t i = 0; i < count; i++) {
auto row = ptrs[i];
data[i] = Load<T>(row + col_offset);
ValidityBytes row_mask(row, column_count);
if (!row_mask.RowIsValid(row_mask.GetValidityEntry(entry_idx), idx_in_entry)) {
throw InternalException("Null value comparisons not implemented for perfect hash table yet");
// col_mask.SetInvalid(i);
}
}
}

void RowOperations::FullScanColumn(const TupleDataLayout &layout, Vector &rows, Vector &col, idx_t count,
idx_t col_no) {
const auto col_offset = layout.GetOffsets()[col_no];
col.SetVectorType(VectorType::FLAT_VECTOR);
switch (col.GetType().InternalType()) {
case PhysicalType::UINT8:
TemplatedFullScanLoop<uint8_t>(rows, col, count, col_offset, col_no, layout.ColumnCount());
break;
case PhysicalType::UINT16:
TemplatedFullScanLoop<uint16_t>(rows, col, count, col_offset, col_no, layout.ColumnCount());
break;
case PhysicalType::UINT32:
TemplatedFullScanLoop<uint32_t>(rows, col, count, col_offset, col_no, layout.ColumnCount());
break;
case PhysicalType::UINT64:
TemplatedFullScanLoop<uint64_t>(rows, col, count, col_offset, col_no, layout.ColumnCount());
break;
case PhysicalType::INT8:
TemplatedFullScanLoop<int8_t>(rows, col, count, col_offset, col_no, layout.ColumnCount());
break;
case PhysicalType::INT16:
TemplatedFullScanLoop<int16_t>(rows, col, count, col_offset, col_no, layout.ColumnCount());
break;
case PhysicalType::INT32:
TemplatedFullScanLoop<int32_t>(rows, col, count, col_offset, col_no, layout.ColumnCount());
break;
case PhysicalType::INT64:
TemplatedFullScanLoop<int64_t>(rows, col, count, col_offset, col_no, layout.ColumnCount());
break;
default:
throw NotImplementedException("Unimplemented type for RowOperations::FullScanColumn");
}
}

} // namespace duckdb
6 changes: 6 additions & 0 deletions src/common/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@
"custom_implementation": true,
"struct": "DuckDBAPISetting"
},
{
"name": "dynamic_or_filter_threshold",
"description": "The maximum amount of OR filters we generate dynamically from a hash join",
"type": "UBIGINT",
"scope": "local"
},
{
"name": "enable_external_access",
"description": "Allow the database to access external state (through e.g. loading/installing modules, COPY TO/FROM, CSV \"\n\t \"readers, pandas replacement scans, etc)",
Expand Down
3 changes: 2 additions & 1 deletion src/execution/operator/join/perfect_hash_join_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ bool PerfectHashJoinExecutor::FullScanHashTable(LogicalType &key_type) {

// Scan the build keys in the hash table
Vector build_vector(key_type, key_count);
RowOperations::FullScanColumn(ht.layout, tuples_addresses, build_vector, key_count, 0);
data_collection.Gather(tuples_addresses, *FlatVector::IncrementalSelectionVector(), key_count, 0, build_vector,
*FlatVector::IncrementalSelectionVector(), nullptr);

// Now fill the selection vector using the build keys and create a sequential vector
// TODO: add check for fast pass when probe is part of build domain
Expand Down
60 changes: 57 additions & 3 deletions src/execution/operator/join/physical_hash_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
#include "duckdb/common/radix_partitioning.hpp"
#include "duckdb/execution/expression_executor.hpp"
#include "duckdb/execution/operator/aggregate/ungrouped_aggregate_state.hpp"
#include "duckdb/function/aggregate/distributive_functions.hpp"
#include "duckdb/function/aggregate/distributive_function_utils.hpp"
#include "duckdb/function/aggregate/distributive_functions.hpp"
#include "duckdb/function/function_binder.hpp"
#include "duckdb/main/client_context.hpp"
#include "duckdb/main/query_profiler.hpp"
Expand All @@ -15,12 +15,17 @@
#include "duckdb/parallel/thread_context.hpp"
#include "duckdb/planner/expression/bound_aggregate_expression.hpp"
#include "duckdb/planner/expression/bound_reference_expression.hpp"
#include "duckdb/planner/filter/conjunction_filter.hpp"
#include "duckdb/planner/filter/constant_filter.hpp"
#include "duckdb/planner/filter/in_filter.hpp"
#include "duckdb/planner/filter/null_filter.hpp"
#include "duckdb/planner/filter/optional_filter.hpp"
#include "duckdb/planner/table_filter.hpp"
#include "duckdb/storage/buffer_manager.hpp"
#include "duckdb/storage/storage_manager.hpp"
#include "duckdb/storage/temporary_memory_manager.hpp"
#include "duckdb/common/types/value_map.hpp"
#include "duckdb/optimizer/filter_combiner.hpp"

namespace duckdb {

Expand Down Expand Up @@ -567,7 +572,50 @@ class HashJoinRepartitionEvent : public BasePipelineEvent {
}
};

void JoinFilterPushdownInfo::PushFilters(JoinFilterGlobalState &gstate, const PhysicalOperator &op) const {
void JoinFilterPushdownInfo::PushInFilter(const JoinFilterPushdownFilter &info, JoinHashTable &ht,
const PhysicalOperator &op, idx_t filter_idx, idx_t filter_col_idx) const {
// generate a "OR" filter (i.e. x=1 OR x=535 OR x=997)
// first scan the entire vector at the probe side
// FIXME: this code is duplicated from PerfectHashJoinExecutor::FullScanHashTable
auto build_idx = join_condition[filter_idx];
auto &data_collection = ht.GetDataCollection();

Vector tuples_addresses(LogicalType::POINTER, ht.Count()); // allocate space for all the tuples

JoinHTScanState join_ht_state(data_collection, 0, data_collection.ChunkCount(),
TupleDataPinProperties::KEEP_EVERYTHING_PINNED);

// Go through all the blocks and fill the keys addresses
idx_t key_count = ht.FillWithHTOffsets(join_ht_state, tuples_addresses);

// Scan the build keys in the hash table
Vector build_vector(ht.layout.GetTypes()[build_idx], key_count);
data_collection.Gather(tuples_addresses, *FlatVector::IncrementalSelectionVector(), key_count, build_idx,
build_vector, *FlatVector::IncrementalSelectionVector(), nullptr);

// generate the OR-clause - note that we only need to consider unique values here (so we use a seT)
value_set_t unique_ht_values;
for (idx_t k = 0; k < key_count; k++) {
unique_ht_values.insert(build_vector.GetValue(k));
}
vector<Value> in_list(unique_ht_values.begin(), unique_ht_values.end());

// generating the OR filter only makes sense if the range is not dense
// i.e. if we have the values [0, 1, 2, 3, 4] - the min/max is fully equivalent to the OR filter
if (FilterCombiner::IsDenseRange(in_list)) {
return;
}

// generate the OR filter
auto or_filter = make_uniq<InFilter>(std::move(in_list));
// we push the OR filter as an OptionalFilter so that we can use it for zonemap pruning only
// the IN-list is expensive to execute otherwise
auto filter = make_uniq<OptionalFilter>(std::move(or_filter));
info.dynamic_filters->PushFilter(op, filter_col_idx, std::move(filter));
}

void JoinFilterPushdownInfo::PushFilters(ClientContext &context, JoinHashTable &ht, JoinFilterGlobalState &gstate,
const PhysicalOperator &op) const {
// finalize the min/max aggregates
vector<LogicalType> min_max_types;
for (auto &aggr_expr : min_max_aggregates) {
Expand All @@ -578,6 +626,7 @@ void JoinFilterPushdownInfo::PushFilters(JoinFilterGlobalState &gstate, const Ph

gstate.global_aggregate_state->Finalize(final_min_max);

auto dynamic_or_filter_threshold = ClientConfig::GetSetting<DynamicOrFilterThresholdSetting>(context);
// create a filter for each of the aggregates
for (idx_t filter_idx = 0; filter_idx < join_condition.size(); filter_idx++) {
for (auto &info : probe_info) {
Expand All @@ -593,6 +642,11 @@ void JoinFilterPushdownInfo::PushFilters(JoinFilterGlobalState &gstate, const Ph
// hash table e.g. because they are part of a RIGHT join
continue;
}
// if the HT is small we can generate a complete "OR" filter
if (ht.Count() > 1 && ht.Count() <= dynamic_or_filter_threshold) {
PushInFilter(info, ht, op, filter_idx, filter_col_idx);
}

if (Value::NotDistinctFrom(min_val, max_val)) {
// min = max - generate an equality filter
auto constant_filter = make_uniq<ConstantFilter>(ExpressionType::COMPARE_EQUAL, std::move(min_val));
Expand Down Expand Up @@ -655,7 +709,7 @@ SinkFinalizeType PhysicalHashJoin::Finalize(Pipeline &pipeline, Event &event, Cl
ht.Unpartition();

if (filter_pushdown && ht.Count() > 0) {
filter_pushdown->PushFilters(*sink.global_filter_state, *this);
filter_pushdown->PushFilters(context, ht, *sink.global_filter_state, *this);
}

// check for possible perfect hash table
Expand Down
8 changes: 8 additions & 0 deletions src/include/duckdb/common/array_ptr.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// duckdb/common/array_ptr.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "duckdb/common/exception.hpp"
Expand Down
2 changes: 0 additions & 2 deletions src/include/duckdb/common/row_operations/row_operations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ struct RowOperations {
static void Gather(Vector &rows, const SelectionVector &row_sel, Vector &col, const SelectionVector &col_sel,
const idx_t count, const RowLayout &layout, const idx_t col_no, const idx_t build_size = 0,
data_ptr_t heap_ptr = nullptr);
//! Full Scan an entire columns
static void FullScanColumn(const TupleDataLayout &layout, Vector &rows, Vector &col, idx_t count, idx_t col_idx);

//===--------------------------------------------------------------------===//
// Comparison Operators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class DataChunk;
class DynamicTableFilterSet;
struct GlobalUngroupedAggregateState;
struct LocalUngroupedAggregateState;
class JoinHashTable;

struct JoinFilterPushdownColumn {
//! The probe column index to which this filter should be applied
Expand Down Expand Up @@ -58,7 +59,12 @@ struct JoinFilterPushdownInfo {

void Sink(DataChunk &chunk, JoinFilterLocalState &lstate) const;
void Combine(JoinFilterGlobalState &gstate, JoinFilterLocalState &lstate) const;
void PushFilters(JoinFilterGlobalState &gstate, const PhysicalOperator &op) const;
void PushFilters(ClientContext &context, JoinHashTable &ht, JoinFilterGlobalState &gstate,
const PhysicalOperator &op) const;

private:
void PushInFilter(const JoinFilterPushdownFilter &info, JoinHashTable &ht, const PhysicalOperator &op,
idx_t filter_idx, idx_t filter_col_idx) const;
};

} // namespace duckdb
7 changes: 5 additions & 2 deletions src/include/duckdb/main/client_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ struct ClientConfig {
//! The threshold at which we switch from using filtered aggregates to LIST with a dedicated pivot operator
idx_t pivot_filter_threshold = 20;

//! The maximum amount of OR filters we generate dynamically from a hash join
idx_t dynamic_or_filter_threshold = 50;

//! Whether or not the "/" division operator defaults to integer division or floating point division
bool integer_division = false;
//! When a scalar subquery returns multiple rows - return a random row instead of returning an error
Expand Down Expand Up @@ -171,12 +174,12 @@ struct ClientConfig {
}

template <class OP>
typename OP::RETURN_TYPE GetSetting(const ClientContext &context) {
static typename OP::RETURN_TYPE GetSetting(const ClientContext &context) {
return OP::GetSetting(context).template GetValue<typename OP::RETURN_TYPE>();
}

template <class OP>
Value GetSettingValue(const ClientContext &context) {
static Value GetSettingValue(const ClientContext &context) {
return OP::GetSetting(context);
}

Expand Down
11 changes: 11 additions & 0 deletions src/include/duckdb/main/settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,17 @@ struct DuckDBAPISetting {
static Value GetSetting(const ClientContext &context);
};

struct DynamicOrFilterThresholdSetting {
using RETURN_TYPE = idx_t;
static constexpr const char *Name = "dynamic_or_filter_threshold";
static constexpr const char *Description =
"The maximum amount of OR filters we generate dynamically from a hash join";
static constexpr const char *InputType = "UBIGINT";
static void SetLocal(ClientContext &context, const Value &parameter);
static void ResetLocal(ClientContext &context);
static Value GetSetting(const ClientContext &context);
};

struct EnableExternalAccessSetting {
using RETURN_TYPE = bool;
static constexpr const char *Name = "enable_external_access";
Expand Down
4 changes: 4 additions & 0 deletions src/include/duckdb/optimizer/filter_combiner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class FilterCombiner {

FilterResult AddFilter(unique_ptr<Expression> expr);

//! Returns whether or not a set of integral values is a dense range (i.e. 1, 2, 3, 4, 5)
//! If this returns true - this sorts "in_list" as a side-effect
static bool IsDenseRange(vector<Value> &in_list);

void GenerateFilters(const std::function<void(unique_ptr<Expression> filter)> &callback);
bool HasFilters();
TableFilterSet GenerateTableScanFilters(const vector<ColumnIndex> &column_ids);
Expand Down
35 changes: 35 additions & 0 deletions src/include/duckdb/planner/filter/in_filter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// duckdb/planner/filter/in_filter.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "duckdb/planner/table_filter.hpp"
#include "duckdb/common/types/value.hpp"

namespace duckdb {

class InFilter : public TableFilter {
public:
static constexpr const TableFilterType TYPE = TableFilterType::IN_FILTER;

public:
explicit InFilter(vector<Value> values);

vector<Value> values;

public:
FilterPropagateResult CheckStatistics(BaseStatistics &stats) override;
string ToString(const string &column_name) override;
bool Equals(const TableFilter &other) const override;
unique_ptr<TableFilter> Copy() const override;
unique_ptr<Expression> ToExpression(const Expression &column) const override;
void Serialize(Serializer &serializer) const override;
static unique_ptr<TableFilter> Deserialize(Deserializer &deserializer);
};

} // namespace duckdb
Loading

0 comments on commit b470dea

Please sign in to comment.