Skip to content

Commit

Permalink
Support orc filter push down (file + stripe + rowgroup level) (#55330)
Browse files Browse the repository at this point in the history
* support orc filter push down

* update orc lib version

* replace setqueryinfo with setkeycondition

* fix issue #53536

* refactor source with key condition

* fix building error

* remove std::cout

* update orc

* update orc version

* fix bugs

* improve code

* upgrade orc lib

* fix code style

* change as requested

* add performance tests for orc filter push down

* add performance tests for orc filter push down

* fix all bugs

* fix default as null issue

* add uts for null as default issues

* upgrade orc lib

* fix failed orc lib uts and fix typo

* fix failed uts

* fix failed uts

* fix ast fuzzer tests

* fix bug of uint64 overflow in https://s3.amazonaws.com/clickhouse-test-reports/55330/de22fdcaea2e12c96f300e95f59beba84401712d/fuzzer_astfuzzerubsan/report.html

* fix asan fatal caused by reused column vector batch in native orc input format. refer to https://s3.amazonaws.com/clickhouse-test-reports/55330/be39d23af2d7e27f5ec7f168947cf75aeaabf674/stateless_tests__asan__[4_4].htm

* fix wrong performance tests

* disable 02892_orc_filter_pushdown on aarch64. https://s3.amazonaws.com/clickhouse-test-reports/55330/be39d23af2d7e27f5ec7f168947cf75aeaabf674/stateless_tests__aarch64_.html

* add some comments

* add some comments

* inline range::equals and range::less

* fix data race of key condition

* trigger ci
  • Loading branch information
taiyang-li committed Oct 24, 2023
1 parent 57df567 commit 465962d
Show file tree
Hide file tree
Showing 36 changed files with 1,204 additions and 89 deletions.
15 changes: 6 additions & 9 deletions src/Core/Range.cpp
Expand Up @@ -89,17 +89,14 @@ void Range::shrinkToIncludedIfPossible()
}
}

namespace
bool Range::equals(const Field & lhs, const Field & rhs)
{
inline bool equals(const Field & lhs, const Field & rhs)
{
return applyVisitor(FieldVisitorAccurateEquals(), lhs, rhs);
}
return applyVisitor(FieldVisitorAccurateEquals(), lhs, rhs);
}

inline bool less(const Field & lhs, const Field & rhs)
{
return applyVisitor(FieldVisitorAccurateLess(), lhs, rhs);
}
bool Range::less(const Field & lhs, const Field & rhs)
{
return applyVisitor(FieldVisitorAccurateLess(), lhs, rhs);
}

bool Range::empty() const
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Range.h
Expand Up @@ -59,6 +59,9 @@ struct Range
static Range createRightBounded(const FieldRef & right_point, bool right_included, bool with_null = false);
static Range createLeftBounded(const FieldRef & left_point, bool left_included, bool with_null = false);

static ALWAYS_INLINE bool equals(const Field & lhs, const Field & rhs);
static ALWAYS_INLINE bool less(const Field & lhs, const Field & rhs);

/** Optimize the range. If it has an open boundary and the Field type is "loose"
* - then convert it to closed, narrowing by one.
* That is, for example, turn (0,2) into [1].
Expand Down
2 changes: 2 additions & 0 deletions src/Core/Settings.h
Expand Up @@ -900,6 +900,7 @@ class IColumn;
M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \
M(Bool, input_format_orc_allow_missing_columns, false, "Allow missing columns while reading ORC input formats", 0) \
M(Bool, input_format_orc_use_fast_decoder, true, "Use a faster ORC decoder implementation.", 0) \
M(Bool, input_format_orc_filter_push_down, true, "When reading ORC files, skip whole stripes or row groups based on the WHERE/PREWHERE expressions, min/max statistics or bloom filter in the ORC metadata.", 0) \
M(Bool, input_format_parquet_allow_missing_columns, false, "Allow missing columns while reading Parquet input formats", 0) \
M(UInt64, input_format_parquet_local_file_min_bytes_for_seek, 8192, "Min bytes required for local read (file) to do seek, instead of read with ignore in Parquet input format", 0) \
M(Bool, input_format_arrow_allow_missing_columns, false, "Allow missing columns while reading Arrow input formats", 0) \
Expand Down Expand Up @@ -1051,6 +1052,7 @@ class IColumn;
\
M(Bool, output_format_orc_string_as_string, false, "Use ORC String type instead of Binary for String columns", 0) \
M(ORCCompression, output_format_orc_compression_method, "lz4", "Compression method for ORC output format. Supported codecs: lz4, snappy, zlib, zstd, none (uncompressed)", 0) \
M(UInt64, output_format_orc_row_index_stride, 10'000, "Target row index stride in ORC output format", 0) \
\
M(CapnProtoEnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::CapnProtoEnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0) \
\
Expand Down
2 changes: 2 additions & 0 deletions src/Formats/FormatFactory.cpp
Expand Up @@ -194,7 +194,9 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.orc.case_insensitive_column_matching = settings.input_format_orc_case_insensitive_column_matching;
format_settings.orc.output_string_as_string = settings.output_format_orc_string_as_string;
format_settings.orc.output_compression_method = settings.output_format_orc_compression_method;
format_settings.orc.output_row_index_stride = settings.output_format_orc_row_index_stride;
format_settings.orc.use_fast_decoder = settings.input_format_orc_use_fast_decoder;
format_settings.orc.filter_push_down = settings.input_format_orc_filter_push_down;
format_settings.defaults_for_omitted_fields = settings.input_format_defaults_for_omitted_fields;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
format_settings.capn_proto.skip_fields_with_unsupported_types_in_schema_inference = settings.input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference;
Expand Down
2 changes: 2 additions & 0 deletions src/Formats/FormatSettings.h
Expand Up @@ -364,6 +364,8 @@ struct FormatSettings
bool output_string_as_string = false;
ORCCompression output_compression_method = ORCCompression::NONE;
bool use_fast_decoder = true;
bool filter_push_down = true;
UInt64 output_row_index_stride = 10'000;
} orc;

/// For capnProto format we should determine how to
Expand Down
4 changes: 3 additions & 1 deletion src/Interpreters/Set.h
Expand Up @@ -198,7 +198,7 @@ using FunctionPtr = std::shared_ptr<IFunction>;
*/
struct FieldValue
{
FieldValue(MutableColumnPtr && column_) : column(std::move(column_)) {}
explicit FieldValue(MutableColumnPtr && column_) : column(std::move(column_)) {}
void update(const Field & x);

bool isNormal() const { return !value.isPositiveInfinity() && !value.isNegativeInfinity(); }
Expand Down Expand Up @@ -230,6 +230,8 @@ class MergeTreeSetIndex

size_t size() const { return ordered_set.at(0)->size(); }

const Columns & getOrderedSet() const { return ordered_set; }

bool hasMonotonicFunctionsChain() const;

BoolMask checkInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types, bool single_point = false) const;
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/IInputFormat.cpp
Expand Up @@ -6,7 +6,7 @@ namespace DB
{

IInputFormat::IInputFormat(Block header, ReadBuffer * in_)
: ISource(std::move(header)), in(in_)
: SourceWithKeyCondition(std::move(header)), in(in_)
{
column_mapping = std::make_shared<ColumnMapping>();
}
Expand Down
13 changes: 5 additions & 8 deletions src/Processors/Formats/IInputFormat.h
@@ -1,10 +1,11 @@
#pragma once

#include <Processors/Formats/InputFormatErrorsLogger.h>
#include <Processors/ISource.h>
#include <Formats/ColumnMapping.h>
#include <IO/ReadBuffer.h>
#include <Interpreters/Context.h>
#include <Formats/ColumnMapping.h>
#include <Processors/Formats/InputFormatErrorsLogger.h>
#include <Processors/SourceWithKeyCondition.h>
#include <Storages/MergeTree/KeyCondition.h>


namespace DB
Expand All @@ -16,7 +17,7 @@ using ColumnMappingPtr = std::shared_ptr<ColumnMapping>;

/** Input format is a source, that reads data from ReadBuffer.
*/
class IInputFormat : public ISource
class IInputFormat : public SourceWithKeyCondition
{
protected:

Expand All @@ -26,10 +27,6 @@ class IInputFormat : public ISource
/// ReadBuffer can be nullptr for random-access formats.
IInputFormat(Block header, ReadBuffer * in_);

/// If the format is used by a SELECT query, this method may be called.
/// The format may use it for filter pushdown.
virtual void setQueryInfo(const SelectQueryInfo &, ContextPtr) {}

/** In some usecase (hello Kafka) we need to read a lot of tiny streams in exactly the same format.
* The recreating of parser for each small stream takes too long, so we introduce a method
* resetParser() which allow to reset the state of parser to continue reading of
Expand Down

0 comments on commit 465962d

Please sign in to comment.