Skip to content

Commit

Permalink
Merge branch 'master' into hawkfish-evaluate-const
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Wesley committed May 13, 2021
2 parents 48126a6 + 18b0f76 commit 87dca79
Show file tree
Hide file tree
Showing 49 changed files with 1,090 additions and 599 deletions.
2 changes: 1 addition & 1 deletion extension/parquet/include/parquet_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class Allocator;
class ClientContext;
class ChunkCollection;
class BaseStatistics;
struct TableFilterSet;
class TableFilterSet;

struct ParquetReaderScanState {
vector<idx_t> group_idx_list;
Expand Down
154 changes: 84 additions & 70 deletions extension/parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include "duckdb.hpp"
#ifndef DUCKDB_AMALGAMATION
#include "duckdb/planner/table_filter.hpp"
#include "duckdb/planner/filter/constant_filter.hpp"
#include "duckdb/planner/filter/null_filter.hpp"
#include "duckdb/planner/filter/conjunction_filter.hpp"
#include "duckdb/common/file_system.hpp"
#include "duckdb/common/string_util.hpp"
#include "duckdb/common/types/date.hpp"
Expand Down Expand Up @@ -348,38 +351,10 @@ void ParquetReader::PrepareRowGroupBuffer(ParquetReaderScanState &state, idx_t o
auto filter_entry = state.filters->filters.find(out_col_idx);
if (stats && filter_entry != state.filters->filters.end()) {
bool skip_chunk = false;
switch (column_reader->Type().id()) {
case LogicalTypeId::UTINYINT:
case LogicalTypeId::USMALLINT:
case LogicalTypeId::UINTEGER:
case LogicalTypeId::UBIGINT:
case LogicalTypeId::INTEGER:
case LogicalTypeId::BIGINT:
case LogicalTypeId::FLOAT:
case LogicalTypeId::TIMESTAMP:
case LogicalTypeId::DOUBLE: {
auto &num_stats = (NumericStatistics &)*stats;
for (auto &filter : filter_entry->second) {
skip_chunk = !num_stats.CheckZonemap(filter.comparison_type, filter.constant);
if (skip_chunk) {
break;
}
}
break;
}
case LogicalTypeId::BLOB:
case LogicalTypeId::VARCHAR: {
auto &str_stats = (StringStatistics &)*stats;
for (auto &filter : filter_entry->second) {
skip_chunk = !str_stats.CheckZonemap(filter.comparison_type, filter.constant.str_value);
if (skip_chunk) {
break;
}
}
break;
}
default:
break;
auto &filter = *filter_entry->second;
auto prune_result = filter.CheckStatistics(*stats);
if (prune_result == FilterPropagateResult::FILTER_ALWAYS_FALSE) {
skip_chunk = true;
}
if (skip_chunk) {
state.group_offset = group.num_rows;
Expand Down Expand Up @@ -417,6 +392,26 @@ void ParquetReader::InitializeScan(ParquetReaderScanState &state, vector<column_
state.repeat_buf.resize(allocator, STANDARD_VECTOR_SIZE);
}

void FilterIsNull(Vector &v, parquet_filter_t &filter_mask, idx_t count) {
auto &mask = FlatVector::Validity(v);
if (mask.AllValid()) {
filter_mask.reset();
} else {
for (idx_t i = 0; i < count; i++) {
filter_mask[i] = filter_mask[i] && !mask.RowIsValid(i);
}
}
}

void FilterIsNotNull(Vector &v, parquet_filter_t &filter_mask, idx_t count) {
auto &mask = FlatVector::Validity(v);
if (!mask.AllValid()) {
for (idx_t i = 0; i < count; i++) {
filter_mask[i] = filter_mask[i] && mask.RowIsValid(i);
}
}
}

template <class T, class OP>
void TemplatedFilterOperation(Vector &v, T constant, parquet_filter_t &filter_mask, idx_t count) {
D_ASSERT(v.GetVectorType() == VectorType::FLAT_VECTOR); // we just created the damn thing it better be
Expand All @@ -426,7 +421,9 @@ void TemplatedFilterOperation(Vector &v, T constant, parquet_filter_t &filter_ma

if (!mask.AllValid()) {
for (idx_t i = 0; i < count; i++) {
filter_mask[i] = filter_mask[i] && mask.RowIsValid(i) && OP::Operation(v_ptr[i], constant);
if (mask.RowIsValid(i)) {
filter_mask[i] = filter_mask[i] && OP::Operation(v_ptr[i], constant);
}
}
} else {
for (idx_t i = 0; i < count; i++) {
Expand All @@ -444,53 +441,95 @@ static void FilterOperationSwitch(Vector &v, Value &constant, parquet_filter_t &
case LogicalTypeId::BOOLEAN:
TemplatedFilterOperation<bool, OP>(v, constant.value_.boolean, filter_mask, count);
break;

case LogicalTypeId::UTINYINT:
TemplatedFilterOperation<uint8_t, OP>(v, constant.value_.utinyint, filter_mask, count);
break;

case LogicalTypeId::USMALLINT:
TemplatedFilterOperation<uint16_t, OP>(v, constant.value_.usmallint, filter_mask, count);
break;

case LogicalTypeId::UINTEGER:
TemplatedFilterOperation<uint32_t, OP>(v, constant.value_.uinteger, filter_mask, count);
break;

case LogicalTypeId::UBIGINT:
TemplatedFilterOperation<uint64_t, OP>(v, constant.value_.ubigint, filter_mask, count);
break;

case LogicalTypeId::INTEGER:
TemplatedFilterOperation<int32_t, OP>(v, constant.value_.integer, filter_mask, count);
break;

case LogicalTypeId::BIGINT:
TemplatedFilterOperation<int64_t, OP>(v, constant.value_.bigint, filter_mask, count);
break;

case LogicalTypeId::FLOAT:
TemplatedFilterOperation<float, OP>(v, constant.value_.float_, filter_mask, count);
break;

case LogicalTypeId::DOUBLE:
TemplatedFilterOperation<double, OP>(v, constant.value_.double_, filter_mask, count);
break;

case LogicalTypeId::TIMESTAMP:
TemplatedFilterOperation<timestamp_t, OP>(v, constant.value_.timestamp, filter_mask, count);
break;

case LogicalTypeId::BLOB:
case LogicalTypeId::VARCHAR:
TemplatedFilterOperation<string_t, OP>(v, string_t(constant.str_value), filter_mask, count);
break;

default:
throw NotImplementedException("Unsupported type for filter %s", v.ToString());
}
}

static void ApplyFilter(Vector &v, TableFilter &filter, parquet_filter_t &filter_mask, idx_t count) {
switch (filter.filter_type) {
case TableFilterType::CONJUNCTION_AND: {
auto &conjunction = (ConjunctionAndFilter &)filter;
for (auto &child_filter : conjunction.child_filters) {
ApplyFilter(v, *child_filter, filter_mask, count);
}
break;
}
case TableFilterType::CONJUNCTION_OR: {
auto &conjunction = (ConjunctionOrFilter &)filter;
for (auto &child_filter : conjunction.child_filters) {
parquet_filter_t child_mask = filter_mask;
ApplyFilter(v, *child_filter, child_mask, count);
filter_mask |= child_mask;
}
break;
}
case TableFilterType::CONSTANT_COMPARISON: {
auto &constant_filter = (ConstantFilter &)filter;
switch (constant_filter.comparison_type) {
case ExpressionType::COMPARE_EQUAL:
FilterOperationSwitch<Equals>(v, constant_filter.constant, filter_mask, count);
break;
case ExpressionType::COMPARE_LESSTHAN:
FilterOperationSwitch<LessThan>(v, constant_filter.constant, filter_mask, count);
break;
case ExpressionType::COMPARE_LESSTHANOREQUALTO:
FilterOperationSwitch<LessThanEquals>(v, constant_filter.constant, filter_mask, count);
break;
case ExpressionType::COMPARE_GREATERTHAN:
FilterOperationSwitch<GreaterThan>(v, constant_filter.constant, filter_mask, count);
break;
case ExpressionType::COMPARE_GREATERTHANOREQUALTO:
FilterOperationSwitch<GreaterThanEquals>(v, constant_filter.constant, filter_mask, count);
break;
default:
D_ASSERT(0);
}
break;
}
case TableFilterType::IS_NOT_NULL:
FilterIsNotNull(v, filter_mask, count);
break;
case TableFilterType::IS_NULL:
FilterIsNull(v, filter_mask, count);
break;
default:
D_ASSERT(0);
break;
}
}

void ParquetReader::Scan(ParquetReaderScanState &state, DataChunk &result) {
while (ScanInternal(state, result)) {
if (result.size() > 0) {
Expand Down Expand Up @@ -563,32 +602,7 @@ bool ParquetReader::ScanInternal(ParquetReaderScanState &state, DataChunk &resul

need_to_read[filter_col.first] = false;

for (auto &filter : filter_col.second) {
switch (filter.comparison_type) {
case ExpressionType::COMPARE_EQUAL:
FilterOperationSwitch<Equals>(result.data[filter_col.first], filter.constant, filter_mask,
this_output_chunk_rows);
break;
case ExpressionType::COMPARE_LESSTHAN:
FilterOperationSwitch<LessThan>(result.data[filter_col.first], filter.constant, filter_mask,
this_output_chunk_rows);
break;
case ExpressionType::COMPARE_LESSTHANOREQUALTO:
FilterOperationSwitch<LessThanEquals>(result.data[filter_col.first], filter.constant, filter_mask,
this_output_chunk_rows);
break;
case ExpressionType::COMPARE_GREATERTHAN:
FilterOperationSwitch<GreaterThan>(result.data[filter_col.first], filter.constant, filter_mask,
this_output_chunk_rows);
break;
case ExpressionType::COMPARE_GREATERTHANOREQUALTO:
FilterOperationSwitch<GreaterThanEquals>(result.data[filter_col.first], filter.constant,
filter_mask, this_output_chunk_rows);
break;
default:
D_ASSERT(0);
}
}
ApplyFilter(result.data[filter_col.first], *filter_col.second, filter_mask, this_output_chunk_rows);
}

// we still may have to read some cols
Expand Down
6 changes: 4 additions & 2 deletions extension/parquet/parquetcli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#ifndef DUCKDB_AMALGAMATION
#include "parquet_reader.hpp"
#include "duckdb/planner/table_filter.hpp"
#include "duckdb/planner/filter/constant_filter.hpp"
#else
#include "parquet-amalgamation.hpp"
#endif
Expand Down Expand Up @@ -76,8 +77,9 @@ int main(int argc, const char **argv) {
PrintUsage();
}
auto idx = entry->second;
TableFilter filter(Value(splits[1]).CastAs(return_types[idx]), ExpressionType::COMPARE_EQUAL, idx);
filters.filters[idx].push_back(filter);
auto filter =
make_unique<ConstantFilter>(ExpressionType::COMPARE_EQUAL, Value(splits[1]).CastAs(return_types[idx]));
filters.filters[idx] = move(filter);
}

ParquetReaderScanState state;
Expand Down
5 changes: 4 additions & 1 deletion scripts/amalgamation.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ def add_include_dir(dirpath):
'duckdb/storage/object_cache.hpp',
'duckdb/planner/table_filter.hpp',
"duckdb/storage/statistics/string_statistics.hpp",
"duckdb/storage/statistics/numeric_statistics.hpp"]]
"duckdb/storage/statistics/numeric_statistics.hpp",
"duckdb/planner/filter/conjunction_filter.hpp",
"duckdb/planner/filter/constant_filter.hpp",
"duckdb/planner/filter/null_filter.hpp"]]
main_header_files += add_include_dir(os.path.join(include_dir, 'duckdb/parser/expression'))
main_header_files += add_include_dir(os.path.join(include_dir, 'duckdb/parser/parsed_data'))
main_header_files += add_include_dir(os.path.join(include_dir, 'duckdb/parser/tableref'))
Expand Down
1 change: 1 addition & 0 deletions src/common/types/hugeint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "duckdb/common/limits.hpp"

#include <cmath>
#include <limits>

namespace duckdb {

Expand Down
11 changes: 5 additions & 6 deletions src/execution/operator/scan/physical_table_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,11 @@ string PhysicalTableScan::ParamsToString() const {
result += "\n[INFOSEPARATOR]\n";
result += "Filters: ";
for (auto &f : table_filters->filters) {
for (auto &filter : f.second) {
if (filter.column_index < names.size()) {
result += "\n";
result += names[column_ids[filter.column_index]] +
ExpressionTypeToOperator(filter.comparison_type) + filter.constant.ToString();
}
auto &column_index = f.first;
auto &filter = f.second;
if (column_index < names.size()) {
result += filter->ToString(names[column_ids[column_index]]);
result += "\n";
}
}
}
Expand Down
19 changes: 7 additions & 12 deletions src/execution/physical_plan/plan_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,26 @@

namespace duckdb {

unique_ptr<TableFilterSet> FindColumnIndex(vector<TableFilter> &table_filters, vector<column_t> &column_ids) {
unique_ptr<TableFilterSet> CreateTableFilterSet(TableFilterSet &table_filters, vector<column_t> &column_ids) {
// create the table filter map
auto table_filter_set = make_unique<TableFilterSet>();
for (auto &table_filter : table_filters) {
for (auto &table_filter : table_filters.filters) {
// find the relative column index from the absolute column index into the table
idx_t column_index = INVALID_INDEX;
for (idx_t i = 0; i < column_ids.size(); i++) {
if (table_filter.column_index == column_ids[i]) {
if (table_filter.first == column_ids[i]) {
column_index = i;
break;
}
}
if (column_index == INVALID_INDEX) {
throw InternalException("Could not find column index for table filter");
}
table_filter.column_index = column_index;
auto filter = table_filter_set->filters.find(column_index);
if (filter != table_filter_set->filters.end()) {
filter->second.push_back(table_filter);
} else {
table_filter_set->filters.insert(make_pair(column_index, vector<TableFilter> {table_filter}));
}
table_filter_set->filters[column_index] = move(table_filter.second);
}
return table_filter_set;
}

unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalGet &op) {
if (!op.children.empty()) {
// this is for table producing functions that consume subquery results
Expand All @@ -46,8 +41,8 @@ unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalGet &op) {
}

unique_ptr<TableFilterSet> table_filters;
if (!op.table_filters.empty()) {
table_filters = FindColumnIndex(op.table_filters, op.column_ids);
if (!op.table_filters.filters.empty()) {
table_filters = CreateTableFilterSet(op.table_filters, op.column_ids);
}

if (op.function.dependency) {
Expand Down
23 changes: 23 additions & 0 deletions src/include/duckdb/common/enums/filter_propagate_result.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// duckdb/common/enums/filter_propagate_result.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "duckdb/common/constants.hpp"

namespace duckdb {

enum class FilterPropagateResult : uint8_t {
NO_PRUNING_POSSIBLE = 0,
FILTER_ALWAYS_TRUE = 1,
FILTER_ALWAYS_FALSE = 2,
FILTER_TRUE_OR_NULL = 3,
FILTER_FALSE_OR_NULL = 4
};

} // namespace duckdb
1 change: 0 additions & 1 deletion src/include/duckdb/execution/adaptive_filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

#include <random>
namespace duckdb {
struct TableFilter;

class AdaptiveFilter {
public:
Expand Down
2 changes: 1 addition & 1 deletion src/include/duckdb/function/table_function.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace duckdb {
class BaseStatistics;
class LogicalGet;
struct ParallelState;
struct TableFilterSet;
class TableFilterSet;

struct FunctionOperatorData {
virtual ~FunctionOperatorData() {
Expand Down
Loading

0 comments on commit 87dca79

Please sign in to comment.