diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 4f9abb306274..3edb6ff6c1b8 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -1078,6 +1078,7 @@ class IColumn; M(Bool, input_format_parquet_preserve_order, false, "Avoid reordering rows when reading from Parquet files. Usually makes it much slower.", 0) \ M(Bool, input_format_parquet_filter_push_down, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and min/max statistics in the Parquet metadata.", 0) \ M(Bool, input_format_parquet_use_native_reader, false, "When reading Parquet files, to use native reader instead of arrow reader.", 0) \ + M(Bool, input_format_parquet_bloom_filter_push_down, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and bloom filter in the Parquet metadata.", 0) \ M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \ M(Bool, input_format_orc_allow_missing_columns, true, "Allow missing columns while reading ORC input formats", 0) \ M(Bool, input_format_orc_use_fast_decoder, true, "Use a faster ORC decoder implementation.", 0) \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index b3e7a59c5e2f..ee2cce99ff2b 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -75,6 +75,7 @@ static std::initializer_list skip_row_groups = {}; bool output_string_as_string = false; bool output_fixed_string_as_fixed_byte_array = true; diff --git a/src/Interpreters/Set.h b/src/Interpreters/Set.h index 8b44f36b278d..45c570a9bed7 100644 --- a/src/Interpreters/Set.h +++ b/src/Interpreters/Set.h @@ -239,6 +239,8 @@ class MergeTreeSetIndex const Columns & getOrderedSet() const { return ordered_set; } + const std::vector & getIndexesMapping() const { return indexes_mapping; } + private: // If all arguments in tuple are key columns, we can optimize NOT IN when there is only one element. bool has_all_keys; diff --git a/src/Processors/Formats/Impl/ArrowFieldIndexUtil.h b/src/Processors/Formats/Impl/ArrowFieldIndexUtil.h index 24ffdc105811..fca954b6cfb4 100644 --- a/src/Processors/Formats/Impl/ArrowFieldIndexUtil.h +++ b/src/Processors/Formats/Impl/ArrowFieldIndexUtil.h @@ -15,6 +15,7 @@ #include #include #include +#include namespace arrow @@ -65,11 +66,22 @@ class ArrowFieldIndexUtil return result; } + // For a parquet schema {x: {i: int, j: int}}, this should be populated as follows + // clickhouse_index = 0, parquet_indexes = {0, 1} + struct ClickHouseIndexToParquetIndex + { + std::size_t clickhouse_index; + std::vector parquet_indexes; + }; + /// Only collect the required fields' indices. Eg. when just read a field of a struct, /// don't need to collect the whole indices in this struct. - std::vector findRequiredIndices(const Block & header, const arrow::Schema & schema) + std::vector findRequiredIndices( + const Block & header, + const arrow::Schema & schema, + const parquet::FileMetaData & file) { - std::vector required_indices; + std::vector required_indices; std::unordered_set added_indices; /// Flat all named fields' index information into a map. auto fields_indices = calculateFieldIndices(schema); @@ -79,7 +91,7 @@ class ArrowFieldIndexUtil std::string col_name = named_col.name; if (ignore_case) boost::to_lower(col_name); - findRequiredIndices(col_name, named_col.type, fields_indices, added_indices, required_indices); + findRequiredIndices(col_name, i, named_col.type, fields_indices, added_indices, required_indices, file); } return required_indices; } @@ -169,10 +181,12 @@ class ArrowFieldIndexUtil void findRequiredIndices( const String & name, + std::size_t header_index, DataTypePtr data_type, const std::unordered_map> & field_indices, std::unordered_set & added_indices, - std::vector & required_indices) + std::vector & required_indices, + const parquet::FileMetaData & file) { auto nested_type = removeNullable(data_type); if (const DB::DataTypeTuple * type_tuple = typeid_cast(nested_type.get())) @@ -187,20 +201,20 @@ class ArrowFieldIndexUtil if (ignore_case) boost::to_lower(field_name); const auto & field_type = field_types[i]; - findRequiredIndices(Nested::concatenateName(name, field_name), field_type, field_indices, added_indices, required_indices); + findRequiredIndices(Nested::concatenateName(name, field_name), header_index, field_type, field_indices, added_indices, required_indices, file); } return; } } else if (const auto * type_array = typeid_cast(nested_type.get())) { - findRequiredIndices(name, type_array->getNestedType(), field_indices, added_indices, required_indices); + findRequiredIndices(name, header_index, type_array->getNestedType(), field_indices, added_indices, required_indices, file); return; } else if (const auto * type_map = typeid_cast(nested_type.get())) { - findRequiredIndices(name, type_map->getKeyType(), field_indices, added_indices, required_indices); - findRequiredIndices(name, type_map->getValueType(), field_indices, added_indices, required_indices); + findRequiredIndices(name, header_index, type_map->getKeyType(), field_indices, added_indices, required_indices, file); + findRequiredIndices(name, header_index, type_map->getValueType(), field_indices, added_indices, required_indices, file); return; } auto it = field_indices.find(name); @@ -211,14 +225,18 @@ class ArrowFieldIndexUtil } else { + ClickHouseIndexToParquetIndex index_mapping; + index_mapping.clickhouse_index = header_index; for (int j = 0; j < it->second.second; ++j) { auto index = it->second.first + j; if (added_indices.insert(index).second) { - required_indices.emplace_back(index); + index_mapping.parquet_indexes.emplace_back(index); } } + + required_indices.emplace_back(index_mapping); } } }; diff --git a/src/Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.cpp b/src/Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.cpp new file mode 100644 index 000000000000..75eeb15a5190 --- /dev/null +++ b/src/Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.cpp @@ -0,0 +1,525 @@ +#include +#include + +#if USE_PARQUET + +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +namespace +{ + +bool isParquetStringTypeSupportedForBloomFilters( + const std::shared_ptr & logical_type, + parquet::ConvertedType::type converted_type) +{ + if (logical_type && + !logical_type->is_none() + && !(logical_type->is_string() || logical_type->is_BSON() || logical_type->is_JSON())) + { + return false; + } + + if (parquet::ConvertedType::type::NONE != converted_type && + !(converted_type == parquet::ConvertedType::JSON || converted_type == parquet::ConvertedType::UTF8 + || converted_type == parquet::ConvertedType::BSON)) + { + return false; + } + + return true; +} + +bool isParquetIntegerTypeSupportedForBloomFilters(const std::shared_ptr & logical_type, parquet::ConvertedType::type converted_type) +{ + if (logical_type && !logical_type->is_none() && !logical_type->is_int()) + { + return false; + } + + if (parquet::ConvertedType::type::NONE != converted_type && !(converted_type == parquet::ConvertedType::INT_8 || converted_type == parquet::ConvertedType::INT_16 + || converted_type == parquet::ConvertedType::INT_32 || converted_type == parquet::ConvertedType::INT_64 + || converted_type == parquet::ConvertedType::UINT_8 || converted_type == parquet::ConvertedType::UINT_16 + || converted_type == parquet::ConvertedType::UINT_32 || converted_type == parquet::ConvertedType::UINT_64)) + { + return false; + } + + return true; +} + +template +uint64_t hashSpecialFLBATypes(const Field & field) +{ + const T & value = field.safeGet(); + + parquet::FLBA flba(reinterpret_cast(&value)); + + parquet::XxHasher hasher; + + return hasher.Hash(&flba, sizeof(T)); +}; + +std::optional tryHashStringWithoutCompatibilityCheck(const Field & field) +{ + const auto field_type = field.getType(); + + if (field_type != Field::Types::Which::String) + { + return std::nullopt; + } + + parquet::XxHasher hasher; + parquet::ByteArray ba { field.safeGet() }; + + return hasher.Hash(&ba); +} + +std::optional tryHashString( + const Field & field, + const std::shared_ptr & logical_type, + parquet::ConvertedType::type converted_type) +{ + if (!isParquetStringTypeSupportedForBloomFilters(logical_type, converted_type)) + { + return std::nullopt; + } + + return tryHashStringWithoutCompatibilityCheck(field); +} + +std::optional tryHashFLBA( + const Field & field, + const std::shared_ptr & logical_type, + parquet::ConvertedType::type converted_type, + std::size_t parquet_column_length) +{ + if (!isParquetStringTypeSupportedForBloomFilters(logical_type, converted_type)) + { + return std::nullopt; + } + + const auto field_type = field.getType(); + + if (field_type == Field::Types::Which::IPv6 && parquet_column_length == sizeof(IPv6)) + { + return hashSpecialFLBATypes(field); + } + + return tryHashStringWithoutCompatibilityCheck(field); +} + +template +std::optional tryHashInt(const Field & field, const std::shared_ptr & logical_type, parquet::ConvertedType::type converted_type) +{ + if (!isParquetIntegerTypeSupportedForBloomFilters(logical_type, converted_type)) + { + return std::nullopt; + } + + parquet::XxHasher hasher; + + if (field.getType() == Field::Types::Which::Int64) + { + return hasher.Hash(static_cast(field.safeGet())); + } + else if (field.getType() == Field::Types::Which::UInt64) + { + return hasher.Hash(static_cast(field.safeGet())); + } + else if (field.getType() == Field::Types::IPv4) + { + /* + * In theory, we could accept IPv4 over 64 bits variables. It would only be a problem in case it was hashed using the byte array api + * with a zero-ed buffer that had a 32 bits variable copied into it. + * + * To be on the safe side, accept only in case physical type is 32 bits. + * */ + if constexpr (std::is_same_v) + { + return hasher.Hash(static_cast(field.safeGet())); + } + } + + return std::nullopt; +} + +std::optional tryHash(const Field & field, const parquet::ColumnDescriptor * parquet_column_descriptor) +{ + const auto physical_type = parquet_column_descriptor->physical_type(); + const auto & logical_type = parquet_column_descriptor->logical_type(); + const auto converted_type = parquet_column_descriptor->converted_type(); + + switch (physical_type) + { + case parquet::Type::type::INT32: + return tryHashInt(field, logical_type, converted_type); + case parquet::Type::type::INT64: + return tryHashInt(field, logical_type, converted_type); + case parquet::Type::type::BYTE_ARRAY: + return tryHashString(field, logical_type, converted_type); + case parquet::Type::type::FIXED_LEN_BYTE_ARRAY: + return tryHashFLBA(field, logical_type, converted_type, parquet_column_descriptor->type_length()); + default: + return std::nullopt; + } +} + +std::optional> hash(const IColumn * data_column, const parquet::ColumnDescriptor * parquet_column_descriptor) +{ + std::vector hashes; + + for (size_t i = 0u; i < data_column->size(); i++) + { + Field f; + data_column->get(i, f); + + auto hashed_value = tryHash(f, parquet_column_descriptor); + + if (!hashed_value) + { + return std::nullopt; + } + + hashes.emplace_back(*hashed_value); + } + + return hashes; +} + +bool maybeTrueOnBloomFilter(const std::vector & hashes, const std::unique_ptr & bloom_filter) +{ + for (const auto hash : hashes) + { + if (bloom_filter->FindHash(hash)) + { + return true; + } + } + + return false; +} + +const parquet::ColumnDescriptor * getColumnDescriptorIfBloomFilterIsPresent( + const std::unique_ptr & parquet_rg_metadata, + const std::vector & clickhouse_column_index_to_parquet_index, + std::size_t clickhouse_column_index) +{ + if (clickhouse_column_index_to_parquet_index.size() <= clickhouse_column_index) + { + return nullptr; + } + + const auto & parquet_indexes = clickhouse_column_index_to_parquet_index[clickhouse_column_index].parquet_indexes; + + // complex types like structs, tuples and maps will have more than one index. + // we don't support those for now + if (parquet_indexes.size() > 1) + { + return nullptr; + } + + if (parquet_indexes.empty()) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Something bad happened, raise an issue and try the query with `input_format_parquet_bloom_filter_push_down=false`"); + } + + auto parquet_column_index = parquet_indexes[0]; + + const auto * parquet_column_descriptor = parquet_rg_metadata->schema()->Column(parquet_column_index); + + bool column_has_bloom_filter = parquet_rg_metadata->ColumnChunk(parquet_column_index)->bloom_filter_offset().has_value(); + if (!column_has_bloom_filter) + { + return nullptr; + } + + return parquet_column_descriptor; +} + +} + +ParquetBloomFilterCondition::ParquetBloomFilterCondition(const std::vector & condition_, const Block & header_) + : condition(condition_), header(header_) +{ +} + +bool ParquetBloomFilterCondition::mayBeTrueOnRowGroup(const ColumnIndexToBF & column_index_to_column_bf) const +{ + using Function = ConditionElement::Function; + std::vector rpn_stack; + + for (const auto & element : condition) + { + if (element.function == Function::FUNCTION_IN + || element.function == Function::FUNCTION_NOT_IN) + { + bool maybe_true = true; + for (auto column_index = 0u; column_index < element.hashes_per_column.size(); column_index++) + { + // in case bloom filter is not present for this row group + // https://github.com/ClickHouse/ClickHouse/pull/62966#discussion_r1722361237 + if (!column_index_to_column_bf.contains(element.key_columns[column_index])) + { + rpn_stack.emplace_back(true, true); + continue; + } + + bool column_maybe_contains = maybeTrueOnBloomFilter( + element.hashes_per_column[column_index], + column_index_to_column_bf.at(element.key_columns[column_index])); + + if (!column_maybe_contains) + { + maybe_true = false; + break; + } + } + + rpn_stack.emplace_back(maybe_true, true); + if (element.function == Function::FUNCTION_NOT_IN) + rpn_stack.back() = !rpn_stack.back(); + } + else if (element.function == Function::FUNCTION_NOT) + { + rpn_stack.back() = !rpn_stack.back(); + } + else if (element.function == Function::FUNCTION_OR) + { + auto arg1 = rpn_stack.back(); + rpn_stack.pop_back(); + auto arg2 = rpn_stack.back(); + rpn_stack.back() = arg1 | arg2; + } + else if (element.function == Function::FUNCTION_AND) + { + auto arg1 = rpn_stack.back(); + rpn_stack.pop_back(); + auto arg2 = rpn_stack.back(); + rpn_stack.back() = arg1 & arg2; + } + else if (element.function == Function::ALWAYS_TRUE) + { + rpn_stack.emplace_back(true, false); + } + else if (element.function == Function::ALWAYS_FALSE) + { + rpn_stack.emplace_back(false, true); + } + else + { + rpn_stack.emplace_back(true, true); + } + } + + if (rpn_stack.size() != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected stack size in KeyCondition::mayBeTrueOnRowGroup"); + + return rpn_stack[0].can_be_true; +} + +std::unordered_set ParquetBloomFilterCondition::getFilteringColumnKeys() const +{ + std::unordered_set column_keys; + + for (const auto & element : condition) + { + for (const auto index : element.key_columns) + { + column_keys.insert(index); + } + } + + return column_keys; +} + +/* + * `KeyCondition::rpn` is overly complex for bloom filters, some operations are not even supported. Not only that, but to avoid hashing each time + * we loop over a rpn element, we need to store hashes instead of where predicate values. To address this, we loop over `KeyCondition::rpn` + * and build a simplified RPN that holds hashes instead of values. + * + * `KeyCondition::RPNElement::FUNCTION_IN_RANGE` becomes: + * `FUNCTION_IN` + * `FUNCTION_UNKNOWN` when range limits are different + * `KeyCondition::RPNElement::FUNCTION_IN_SET` becomes + * `FUNCTION_IN` + * + * Complex types and structs are not supported. + * There are two sources of data types being analyzed, and they need to be compatible: DB::Field type and parquet type. + * This is determined by the `isColumnSupported` method. + * + * Some interesting examples: + * 1. file(..., 'str_column UInt64') where str_column = 50; Field.type == UInt64. Parquet type string. Not supported. + * 2. file(...) where str_column = 50; Field.type == String (conversion already taken care by `KeyCondition`). Parquet type string. + * 3. file(...) where uint32_column = toIPv4(5). Field.type == IPv4. Incompatible column types, resolved by `KeyCondition` itself. + * 4. file(...) where toIPv4(uint32_column) = toIPv4(5). Field.type == IPv4. We know it is safe to hash it using an int32 API. + * */ +std::vector keyConditionRPNToParquetBloomFilterCondition( + const std::vector & rpn, + const std::vector & clickhouse_column_index_to_parquet_index, + const std::unique_ptr & parquet_rg_metadata) +{ + std::vector condition_elements; + + using RPNElement = KeyCondition::RPNElement; + using Function = ParquetBloomFilterCondition::ConditionElement::Function; + + for (const auto & rpn_element : rpn) + { + // this would be a problem for `where negate(x) = -58`. + // It would perform a bf search on `-58`, and possibly miss row groups containing this data. + if (!rpn_element.monotonic_functions_chain.empty()) + { + condition_elements.emplace_back(Function::FUNCTION_UNKNOWN); + continue; + } + + ParquetBloomFilterCondition::ConditionElement::HashesForColumns hashes; + + if (rpn_element.function == RPNElement::FUNCTION_IN_RANGE + || rpn_element.function == RPNElement::FUNCTION_NOT_IN_RANGE) + { + // Only FUNCTION_EQUALS is supported and for that extremes need to be the same + if (rpn_element.range.left != rpn_element.range.right) + { + condition_elements.emplace_back(Function::FUNCTION_UNKNOWN); + continue; + } + + const auto * parquet_column_descriptor = + getColumnDescriptorIfBloomFilterIsPresent(parquet_rg_metadata, clickhouse_column_index_to_parquet_index, rpn_element.key_column); + + if (!parquet_column_descriptor) + { + condition_elements.emplace_back(Function::FUNCTION_UNKNOWN); + continue; + } + + auto hashed_value = tryHash(rpn_element.range.left, parquet_column_descriptor); + + if (!hashed_value) + { + condition_elements.emplace_back(Function::FUNCTION_UNKNOWN); + continue; + } + + std::vector hashes_for_column; + hashes_for_column.emplace_back(*hashed_value); + + hashes.emplace_back(std::move(hashes_for_column)); + + auto function = rpn_element.function == RPNElement::FUNCTION_IN_RANGE + ? ParquetBloomFilterCondition::ConditionElement::Function::FUNCTION_IN + : ParquetBloomFilterCondition::ConditionElement::Function::FUNCTION_NOT_IN; + + std::vector key_columns; + key_columns.emplace_back(rpn_element.key_column); + + condition_elements.emplace_back(function, std::move(hashes), std::move(key_columns)); + } + else if (rpn_element.function == RPNElement::FUNCTION_IN_SET + || rpn_element.function == RPNElement::FUNCTION_NOT_IN_SET) + { + const auto & set_index = rpn_element.set_index; + const auto & ordered_set = set_index->getOrderedSet(); + const auto & indexes_mapping = set_index->getIndexesMapping(); + bool found_empty_column = false; + + std::vector key_columns; + + for (auto i = 0u; i < ordered_set.size(); i++) + { + const auto & set_column = ordered_set[i]; + + const auto * parquet_column_descriptor = getColumnDescriptorIfBloomFilterIsPresent( + parquet_rg_metadata, + clickhouse_column_index_to_parquet_index, + indexes_mapping[i].key_index); + + if (!parquet_column_descriptor) + { + continue; + } + + auto column = set_column; + + if (column->empty()) + { + found_empty_column = true; + break; + } + + if (const auto & nullable_column = checkAndGetColumn(set_column.get())) + { + column = nullable_column->getNestedColumnPtr(); + } + + auto hashes_for_column_opt = hash(column.get(), parquet_column_descriptor); + + if (!hashes_for_column_opt) + { + continue; + } + + auto & hashes_for_column = *hashes_for_column_opt; + + if (hashes_for_column.empty()) + { + continue; + } + + hashes.emplace_back(hashes_for_column); + + key_columns.push_back(indexes_mapping[i].key_index); + } + + if (found_empty_column) + { + condition_elements.emplace_back(Function::ALWAYS_FALSE); + continue; + } + + if (hashes.empty()) + { + condition_elements.emplace_back(Function::FUNCTION_UNKNOWN); + continue; + } + + auto function = RPNElement::FUNCTION_IN_SET == rpn_element.function ? Function::FUNCTION_IN : Function::FUNCTION_NOT_IN; + + condition_elements.emplace_back(function, hashes, key_columns); + } + else if (rpn_element.function == RPNElement::FUNCTION_NOT) + { + condition_elements.emplace_back(Function::FUNCTION_NOT); + } + else if (rpn_element.function == RPNElement::FUNCTION_OR) + { + condition_elements.emplace_back(Function::FUNCTION_OR); + } + else if (rpn_element.function == RPNElement::FUNCTION_AND) + { + condition_elements.emplace_back(Function::FUNCTION_AND); + } + else + { + condition_elements.emplace_back(Function::ALWAYS_TRUE); + } + } + + return condition_elements; +} + +} + +#endif diff --git a/src/Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.h b/src/Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.h new file mode 100644 index 000000000000..6de6030b23cb --- /dev/null +++ b/src/Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.h @@ -0,0 +1,73 @@ +#pragma once + +#include + +#if USE_PARQUET + +#include +#include +#include + +namespace parquet +{ +class BloomFilter; +} + +namespace DB +{ + +class ParquetBloomFilterCondition +{ +public: + + struct ConditionElement + { + enum Function + { + /// Atoms of a Boolean expression. + FUNCTION_IN, + FUNCTION_NOT_IN, + /// Can take any value. + FUNCTION_UNKNOWN, + /// Operators of the logical expression. + FUNCTION_NOT, + FUNCTION_AND, + FUNCTION_OR, + /// Constants + ALWAYS_FALSE, + ALWAYS_TRUE, + }; + + using ColumnPtr = IColumn::Ptr; + using HashesForColumns = std::vector>; + using KeyColumns = std::vector; + + Function function; + // each entry represents a list of hashes per column + // suppose there are three columns with 2 rows each + // hashes_per_column.size() == 3 and hashes_per_column[0].size() == 2 + HashesForColumns hashes_per_column; + KeyColumns key_columns; + }; + + using RPNElement = KeyCondition::RPNElement; + using ColumnIndexToBF = std::unordered_map>; + + explicit ParquetBloomFilterCondition(const std::vector & condition_, const Block & header_); + + bool mayBeTrueOnRowGroup(const ColumnIndexToBF & column_index_to_column_bf) const; + std::unordered_set getFilteringColumnKeys() const; + +private: + std::vector condition; + Block header; +}; + +std::vector keyConditionRPNToParquetBloomFilterCondition( + const std::vector & rpn, + const std::vector & clickhouse_column_index_to_parquet_index, + const std::unique_ptr & parquet_rg_metadata); + +} + +#endif diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 4231ef3c7ab5..bfab82148d32 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -14,6 +14,8 @@ #include #include #include +#include +#include #include #include #include "ArrowBufferedStreams.h" @@ -25,6 +27,7 @@ #include #include #include +#include #include namespace CurrentMetrics @@ -264,6 +267,50 @@ static Field decodePlainParquetValueSlow(const std::string & data, parquet::Type return field; } +static ParquetBloomFilterCondition::ColumnIndexToBF buildColumnIndexToBF( + parquet::BloomFilterReader & bf_reader, + int row_group, + const std::vector & clickhouse_column_index_to_parquet_index, + const std::unordered_set & filtering_columns +) +{ + auto rg_bf = bf_reader.RowGroup(row_group); + + if (!rg_bf) + { + return {}; + } + + ParquetBloomFilterCondition::ColumnIndexToBF index_to_column_bf; + + for (const auto & [clickhouse_index, parquet_indexes] : clickhouse_column_index_to_parquet_index) + { + if (!filtering_columns.contains(clickhouse_index)) + { + continue; + } + + // Complex / nested types contain more than one index. We don't support those. + if (parquet_indexes.size() > 1) + { + continue; + } + + auto parquet_index = parquet_indexes[0]; + + auto bf = rg_bf->GetColumnBloomFilter(parquet_index); + + if (!bf) + { + continue; + } + + index_to_column_bf[clickhouse_index] = std::move(bf); + } + + return index_to_column_bf; +} + /// Range of values for each column, based on statistics in the Parquet metadata. /// This is lower/upper bounds, not necessarily exact min and max, e.g. the min/max can be just /// missing in the metadata. @@ -471,9 +518,27 @@ void ParquetBlockInputFormat::initializeIfNeeded() ArrowFieldIndexUtil field_util( format_settings.parquet.case_insensitive_column_matching, format_settings.parquet.allow_missing_columns); - column_indices = field_util.findRequiredIndices(getPort().getHeader(), *schema); + + auto index_mapping = field_util.findRequiredIndices(getPort().getHeader(), *schema, *metadata); + + for (const auto & [clickhouse_header_index, parquet_indexes] : index_mapping) + { + for (auto parquet_index : parquet_indexes) + { + column_indices.push_back(parquet_index); + } + } int num_row_groups = metadata->num_row_groups(); + + if (num_row_groups == 0) + { + return; + } + + const auto bf_reader_properties = parquet::default_reader_properties(); + std::unique_ptr bf_reader; + row_group_batches.reserve(num_row_groups); auto adaptive_chunk_size = [&](int row_group_idx) -> size_t @@ -494,11 +559,38 @@ void ParquetBlockInputFormat::initializeIfNeeded() return std::min(std::max(preferred_num_rows, MIN_ROW_NUM), static_cast(format_settings.parquet.max_block_size)); }; + std::unique_ptr parquet_bloom_filter_condition; + + std::unordered_set filtering_columns; + + if (format_settings.parquet.bloom_filter_push_down && key_condition) + { + bf_reader = parquet::BloomFilterReader::Make(arrow_file, metadata, bf_reader_properties, nullptr); + + const auto parquet_conditions = keyConditionRPNToParquetBloomFilterCondition( + key_condition->getRPN(), + index_mapping, + metadata->RowGroup(0)); + parquet_bloom_filter_condition = std::make_unique(parquet_conditions, getPort().getHeader()); + + filtering_columns = parquet_bloom_filter_condition->getFilteringColumnKeys(); + } + for (int row_group = 0; row_group < num_row_groups; ++row_group) { if (skip_row_groups.contains(row_group)) continue; + if (parquet_bloom_filter_condition) + { + const auto column_index_to_bf = buildColumnIndexToBF(*bf_reader, row_group, index_mapping, filtering_columns); + + if (!parquet_bloom_filter_condition->mayBeTrueOnRowGroup(column_index_to_bf)) + { + continue; + } + } + if (format_settings.parquet.filter_push_down && key_condition && !key_condition ->checkInHyperrectangle( diff --git a/src/Storages/MergeTree/KeyCondition.cpp b/src/Storages/MergeTree/KeyCondition.cpp index caaaf53b9d46..e54d5237ae75 100644 --- a/src/Storages/MergeTree/KeyCondition.cpp +++ b/src/Storages/MergeTree/KeyCondition.cpp @@ -800,7 +800,7 @@ void KeyCondition::getAllSpaceFillingCurves() KeyCondition::KeyCondition( const ActionsDAG * filter_dag, ContextPtr context, - const Names & key_column_names, + const Names & key_column_names_, const ExpressionActionsPtr & key_expr_, bool single_point_) : key_expr(key_expr_) @@ -808,7 +808,7 @@ KeyCondition::KeyCondition( , single_point(single_point_) { size_t key_index = 0; - for (const auto & name : key_column_names) + for (const auto & name : key_column_names_) { if (!key_columns.contains(name)) { diff --git a/tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down.reference b/tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down.reference new file mode 100644 index 000000000000..99ede711976d --- /dev/null +++ b/tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down.reference @@ -0,0 +1,347 @@ +1000 +bloom filter is off, all row groups should be read +expect rows_read = select count() +{ + "data": [ + { + "string": "AZSR", + "flba": "WNMM" + }, + { + "string": "PFJH", + "flba": "GKJC" + } + ], + "rows": 2, + "statistics": { + "rows_read": 1000, + "bytes_read": 47419 + } +} +bloom filter is on, some row groups should be skipped +expect rows_read much less than select count() +{ + "data": [ + { + "string": "AZSR", + "flba": "WNMM" + }, + { + "string": "PFJH", + "flba": "GKJC" + } + ], + "rows": 2, + "statistics": { + "rows_read": 464, + "bytes_read": 21703 + } +} +bloom filter is on, but where predicate contains data from 2 row groups out of 3. +Rows read should be less than select count, but greater than previous selects +{ + "data": [ + { + "string": "PFJH", + "flba": "GKJC" + }, + { + "string": "ZHZK", + "flba": "HRWD" + } + ], + "rows": 2, + "statistics": { + "rows_read": 536, + "bytes_read": 25708 + } +} +bloom filter is on, but where predicate contains data from all row groups +expect rows_read = select count() +{ + "data": [ + { + "string": "PFJH", + "flba": "GKJC" + }, + { + "string": "OKAI", + "flba": "UXGT" + }, + { + "string": "ZHZK", + "flba": "HRWD" + } + ], + "rows": 3, + "statistics": { + "rows_read": 1000, + "bytes_read": 47419 + } +} +IN check +{ + "data": [ + { + "string": "PFJH", + "flba": "GKJC" + }, + { + "string": "ZHZK", + "flba": "HRWD" + } + ], + "rows": 2, + "statistics": { + "rows_read": 536, + "bytes_read": 25708 + } +} +tuple in case, bf is off. +{ + "data": [ + { + "string": "PFJH", + "flba": "GKJC" + } + ], + "rows": 1, + "statistics": { + "rows_read": 1000, + "bytes_read": 47419 + } +} +tuple in case, bf is on. +{ + "data": [ + { + "string": "PFJH", + "flba": "GKJC" + } + ], + "rows": 1, + "statistics": { + "rows_read": 464, + "bytes_read": 21703 + } +} +complex tuple in case, bf is off +{ + "data": [ + { + "string": "PFJH", + "flba": "GKJC" + } + ], + "rows": 1, + "statistics": { + "rows_read": 1000, + "bytes_read": 47419 + } +} +complex tuple in case, bf is on +{ + "data": [ + { + "string": "PFJH", + "flba": "GKJC" + } + ], + "rows": 1, + "statistics": { + "rows_read": 464, + "bytes_read": 21703 + } +} +complex tuple in case, bf is on. Non existent +{ + "data": [], + "rows": 0, + "statistics": { + "rows_read": 0, + "bytes_read": 0 + } +} +Bloom filter for json column. BF is off +{ + "data": [ + { + "json": "{\"key\":38, \"value\":\"NXONM\"}" + } + ], + "rows": 1, + "statistics": { + "rows_read": 1000, + "bytes_read": 47419 + } +} +Bloom filter for json column. BF is on +{ + "data": [ + { + "json": "{\"key\":38, \"value\":\"NXONM\"}" + } + ], + "rows": 1, + "statistics": { + "rows_read": 72, + "bytes_read": 4005 + } +} +Bloom filter for ipv4 column. BF is off +{ + "data": [ + { + "json": "{\"key\":38, \"value\":\"NXONM\"}" + } + ], + "rows": 1, + "statistics": { + "rows_read": 1000, + "bytes_read": 47419 + } +} +Bloom filter for ipv4 column. BF is on +{ + "data": [ + { + "json": "{\"key\":38, \"value\":\"NXONM\"}" + } + ], + "rows": 1, + "statistics": { + "rows_read": 72, + "bytes_read": 4005 + } +} +Bloom filter for ipv4 column. BF is on. Specified in the schema +{ + "data": [ + { + "ipv4": "0.0.1.143" + } + ], + "rows": 1, + "statistics": { + "rows_read": 72, + "bytes_read": 4005 + } +} +Bloom filter on 64 bit column read as ipv4. We explicitly deny it, should read all rg +{ + "data": [ + { + "uint64_logical": "22.230.220.164" + } + ], + "rows": 1, + "statistics": { + "rows_read": 1000, + "bytes_read": 47419 + } +} +BF off for parquet uint64 logical type. Should read everything +{ + "data": [ + { + "json": "{\"key\":683, \"value\":\"YKCPD\"}" + } + ], + "rows": 1, + "statistics": { + "rows_read": 1000, + "bytes_read": 47419 + } +} +BF on for parquet uint64 logical type. Uint64 is stored as a signed int 64, but with logical annotation. Make sure a value greater than int64 can be queried +{ + "data": [ + { + "json": "{\"key\":683, \"value\":\"YKCPD\"}" + } + ], + "rows": 1, + "statistics": { + "rows_read": 464, + "bytes_read": 21711 + } +} +Uint16 is stored as physical type int32 with bidwidth = 16 and sign = false. Make sure a value greater than int16 can be queried. BF is on. +{ + "data": [ + { + "json": "{\"key\":874, \"value\":\"JENHW\"}" + } + ], + "rows": 1, + "statistics": { + "rows_read": 464, + "bytes_read": 21703 + } +} +BF off for parquet int8 logical type. Should read everything +{ + "data": [ + { + "json": "{\"key\":89, \"value\":\"MFIYP\"}" + }, + { + "json": "{\"key\":321, \"value\":\"JNOIA\"}" + }, + { + "json": "{\"key\":938, \"value\":\"UBMLO\"}" + }, + { + "json": "{\"key\":252, \"value\":\"ZVLKF\"}" + } + ], + "rows": 4, + "statistics": { + "rows_read": 1000, + "bytes_read": 47419 + } +} +BF on for parquet int8 logical type. Should skip row groups +{ + "data": [ + { + "json": "{\"key\":89, \"value\":\"MFIYP\"}" + }, + { + "json": "{\"key\":321, \"value\":\"JNOIA\"}" + }, + { + "json": "{\"key\":938, \"value\":\"UBMLO\"}" + }, + { + "json": "{\"key\":252, \"value\":\"ZVLKF\"}" + } + ], + "rows": 4, + "statistics": { + "rows_read": 536, + "bytes_read": 25716 + } +} +Invalid column conversion with in operation. String type can not be hashed against parquet int64 physical type. Should read everything +{ + "data": [], + "rows": 0, + "statistics": { + "rows_read": 1000, + "bytes_read": 47419 + } +} +Transformations on key column shall not be allowed. Should read everything +{ + "data": [ + { + "uint64_logical": "7711695863945021976" + } + ], + "rows": 1, + "statistics": { + "rows_read": 1000, + "bytes_read": 47419 + } +} diff --git a/tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down.sh b/tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down.sh new file mode 100755 index 000000000000..b7d40a1be63a --- /dev/null +++ b/tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down.sh @@ -0,0 +1,96 @@ +#!/usr/bin/env bash +# Tags: no-ubsan, no-fasttest + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + + +USER_FILES_PATH=$($CLICKHOUSE_CLIENT_BINARY --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}') + +WORKING_DIR="${USER_FILES_PATH}/${CLICKHOUSE_TEST_UNIQUE_NAME}" + +mkdir -p "${WORKING_DIR}" + +DATA_FILE="${CUR_DIR}/data_parquet/multi_column_bf.gz.parquet" + +DATA_FILE_USER_PATH="${WORKING_DIR}/multi_column_bf.gz.parquet" + +cp ${DATA_FILE} ${DATA_FILE_USER_PATH} + +${CLICKHOUSE_CLIENT} --query="select count(*) from file('${DATA_FILE_USER_PATH}', Parquet) SETTINGS use_cache_for_count_from_files=false;" + +echo "bloom filter is off, all row groups should be read" +echo "expect rows_read = select count()" +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string='PFJH' or flba='WNMM' order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false" | jq 'del(.meta,.statistics.elapsed)' + +echo "bloom filter is on, some row groups should be skipped" +echo "expect rows_read much less than select count()" +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string='PFJH' or flba='WNMM' order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false" | jq 'del(.meta,.statistics.elapsed)' + +echo "bloom filter is on, but where predicate contains data from 2 row groups out of 3." +echo "Rows read should be less than select count, but greater than previous selects" +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string='PFJH' or string='ZHZK' order by uint16_logical asc Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "bloom filter is on, but where predicate contains data from all row groups" +echo "expect rows_read = select count()" +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string='PFJH' or string='ZHZK' or uint64_logical=18441251162536403933 order by uint16_logical asc Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "IN check" +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where string in ('PFJH', 'ZHZK') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "tuple in case, bf is off." +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in ('PFJH', 'GKJC') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "tuple in case, bf is on." +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in ('PFJH', 'GKJC') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "complex tuple in case, bf is off" +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in (('NON1', 'NON1'), ('PFJH', 'GKJC'), ('NON2', 'NON2')) order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "complex tuple in case, bf is on" +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in (('NON1', 'NON1'), ('PFJH', 'GKJC'), ('NON2', 'NON2')) order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "complex tuple in case, bf is on. Non existent" +${CLICKHOUSE_CLIENT} --query="select string, flba from file('${DATA_FILE_USER_PATH}', Parquet) where (string, flba) in (('NON1', 'NON1'), ('NON2', 'NON2'), ('NON3', 'NON3')) order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "Bloom filter for json column. BF is off" +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where json = '{\"key\":38, \"value\":\"NXONM\"}' order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "Bloom filter for json column. BF is on" +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where json = '{\"key\":38, \"value\":\"NXONM\"}' order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "Bloom filter for ipv4 column. BF is off" +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where ipv4 = IPv4StringToNum('0.0.1.143') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "Bloom filter for ipv4 column. BF is on" +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where ipv4 = IPv4StringToNum('0.0.1.143') order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "Bloom filter for ipv4 column. BF is on. Specified in the schema" +${CLICKHOUSE_CLIENT} --query="select ipv4 from file('${DATA_FILE_USER_PATH}', Parquet, 'ipv4 IPv4') where ipv4 = toIPv4('0.0.1.143') order by ipv4 asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "Bloom filter on 64 bit column read as ipv4. We explicitly deny it, should read all rg" +${CLICKHOUSE_CLIENT} --query="select uint64_logical from file ('${DATA_FILE_USER_PATH}', Parquet, 'uint64_logical IPv4') where uint64_logical = toIPv4(5552715629697883300) order by uint64_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "BF off for parquet uint64 logical type. Should read everything" +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint64_logical=18441251162536403933 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "BF on for parquet uint64 logical type. Uint64 is stored as a signed int 64, but with logical annotation. Make sure a value greater than int64 can be queried" +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint64_logical=18441251162536403933 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "Uint16 is stored as physical type int32 with bidwidth = 16 and sign = false. Make sure a value greater than int16 can be queried. BF is on." +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where uint16_logical=65528 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "BF off for parquet int8 logical type. Should read everything" +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where int8_logical=-126 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "BF on for parquet int8 logical type. Should skip row groups" +${CLICKHOUSE_CLIENT} --query="select json from file('${DATA_FILE_USER_PATH}', Parquet) where int8_logical=-126 order by uint16_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "Invalid column conversion with in operation. String type can not be hashed against parquet int64 physical type. Should read everything" +${CLICKHOUSE_CLIENT} --query="select uint64_logical from file('${DATA_FILE_USER_PATH}', Parquet, 'uint64_logical String') where uint64_logical in ('5') order by uint64_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "Transformations on key column shall not be allowed. Should read everything" +${CLICKHOUSE_CLIENT} --query="select uint64_logical from file('${DATA_FILE_USER_PATH}', Parquet) where negate(uint64_logical) = -7711695863945021976 order by uint64_logical asc FORMAT Json SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +rm -rf ${USER_FILES_PATH}/${CLICKHOUSE_TEST_UNIQUE_NAME:?}/* diff --git a/tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down_ipv6.reference b/tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down_ipv6.reference new file mode 100644 index 000000000000..acb66d986e5f --- /dev/null +++ b/tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down_ipv6.reference @@ -0,0 +1,76 @@ +bloom filter is off, row groups should be read +expect rows_read = select count() +{ + "data": [ + { + "ipv6": "7afe:b9d4:e754:4e78:8783:37f5:b2ea:9995" + } + ], + "rows": 1, + "statistics": { + "rows_read": 5, + "bytes_read": 128 + } +} +bloom filter is on for ipv6, row groups should also be read since there is only one. Below queries just make sure the data is properly returned +{ + "data": [ + { + "ipv6": "7afe:b9d4:e754:4e78:8783:37f5:b2ea:9995" + } + ], + "rows": 1, + "statistics": { + "rows_read": 5, + "bytes_read": 128 + } +} +{ + "data": [ + { + "ipv6": "7afe:b9d4:e754:4e78:8783:37f5:b2ea:9995" + } + ], + "rows": 1, + "statistics": { + "rows_read": 5, + "bytes_read": 128 + } +} +{ + "data": [ + { + "toIPv6(ipv6)": "7afe:b9d4:e754:4e78:8783:37f5:b2ea:9995" + } + ], + "rows": 1, + "statistics": { + "rows_read": 5, + "bytes_read": 128 + } +} +non existent ipv6, row group should be skipped +{ + "data": [], + "rows": 0, + "statistics": { + "rows_read": 0, + "bytes_read": 0 + } +} +{ + "data": [], + "rows": 0, + "statistics": { + "rows_read": 0, + "bytes_read": 0 + } +} +{ + "data": [], + "rows": 0, + "statistics": { + "rows_read": 5, + "bytes_read": 128 + } +} diff --git a/tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down_ipv6.sh b/tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down_ipv6.sh new file mode 100755 index 000000000000..752e7ed38a58 --- /dev/null +++ b/tests/queries/0_stateless/03036_test_parquet_bloom_filter_push_down_ipv6.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash +# Tags: no-ubsan, no-fasttest + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + + +USER_FILES_PATH=$($CLICKHOUSE_CLIENT_BINARY --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}') + +WORKING_DIR="${USER_FILES_PATH}/${CLICKHOUSE_TEST_UNIQUE_NAME}" + +mkdir -p "${WORKING_DIR}" + +DATA_FILE="${CUR_DIR}/data_parquet/ipv6_bloom_filter.gz.parquet" + +DATA_FILE_USER_PATH="${WORKING_DIR}/ipv6_bloom_filter.gz.parquet" + +cp ${DATA_FILE} ${DATA_FILE_USER_PATH} + +echo "bloom filter is off, row groups should be read" +echo "expect rows_read = select count()" +${CLICKHOUSE_CLIENT} --query="select ipv6 from file('${DATA_FILE_USER_PATH}', Parquet, 'ipv6 IPv6') where ipv6 = '7afe:b9d4:e754:4e78:8783:37f5:b2ea:9995' Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=false, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "bloom filter is on for ipv6, row groups should also be read since there is only one. Below queries just make sure the data is properly returned" +${CLICKHOUSE_CLIENT} --query="select ipv6 from file('${DATA_FILE_USER_PATH}', Parquet, 'ipv6 IPv6') where ipv6 = '7afe:b9d4:e754:4e78:8783:37f5:b2ea:9995' Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select ipv6 from file('${DATA_FILE_USER_PATH}', Parquet, 'ipv6 IPv6') where ipv6 = toIPv6('7afe:b9d4:e754:4e78:8783:37f5:b2ea:9995') Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select toIPv6(ipv6) from file('${DATA_FILE_USER_PATH}', Parquet) where ipv6 = toIPv6('7afe:b9d4:e754:4e78:8783:37f5:b2ea:9995') Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +echo "non existent ipv6, row group should be skipped" +${CLICKHOUSE_CLIENT} --query="select ipv6 from file('${DATA_FILE_USER_PATH}', Parquet, 'ipv6 IPv6') where ipv6 = 'fafe:b9d4:e754:4e78:8783:37f5:b2ea:9995' Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select ipv6 from file('${DATA_FILE_USER_PATH}', Parquet, 'ipv6 IPv6') where ipv6 = toIPv6('fafe:b9d4:e754:4e78:8783:37f5:b2ea:9995') Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' +${CLICKHOUSE_CLIENT} --query="select toIPv6(ipv6) from file('${DATA_FILE_USER_PATH}', Parquet) where ipv6 = toIPv6('fafe:b9d4:e754:4e78:8783:37f5:b2ea:9995') Format JSON SETTINGS input_format_parquet_bloom_filter_push_down=true, input_format_parquet_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' diff --git a/tests/queries/0_stateless/data_parquet/ipv6_bloom_filter.gz.parquet b/tests/queries/0_stateless/data_parquet/ipv6_bloom_filter.gz.parquet new file mode 100644 index 000000000000..335fddc1f0ad Binary files /dev/null and b/tests/queries/0_stateless/data_parquet/ipv6_bloom_filter.gz.parquet differ diff --git a/tests/queries/0_stateless/data_parquet/multi_column_bf.gz.parquet b/tests/queries/0_stateless/data_parquet/multi_column_bf.gz.parquet new file mode 100644 index 000000000000..bfa0a39062a0 Binary files /dev/null and b/tests/queries/0_stateless/data_parquet/multi_column_bf.gz.parquet differ