Skip to content
77 changes: 52 additions & 25 deletions be/src/format/new_parquet/parquet_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,9 @@ ParquetRowGroupPruneReason BloomFilterPruneReason(
if (bloom_filter_cache == nullptr || column_filter.predicates.empty()) {
return ParquetRowGroupPruneReason::NONE;
}
DCHECK_LT(column_filter.file_column_id, schema.size());
const auto& column_schema = *schema[column_filter.file_column_id];
if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE ||
column_schema.leaf_column_id < 0 ||
!ParquetStatisticsUtils::BloomFilterSupported(column_schema)) {
const auto* column_schema =
ParquetStatisticsUtils::ResolvePredicateLeafSchema(schema, column_filter);
if (column_schema == nullptr || !ParquetStatisticsUtils::BloomFilterSupported(*column_schema)) {
return ParquetRowGroupPruneReason::NONE;
}
for (const auto& column_predicate : column_filter.predicates) {
Expand All @@ -301,11 +299,11 @@ ParquetRowGroupPruneReason BloomFilterPruneReason(
}
}
auto* bloom_filter =
bloom_filter_cache->get(row_group_idx, column_schema.leaf_column_id, pruning_stats);
bloom_filter_cache->get(row_group_idx, column_schema->leaf_column_id, pruning_stats);
if (bloom_filter == nullptr) {
return ParquetRowGroupPruneReason::NONE;
}
return ParquetStatisticsUtils::BloomFilterExcludes(column_schema, column_filter, *bloom_filter)
return ParquetStatisticsUtils::BloomFilterExcludes(*column_schema, column_filter, *bloom_filter)
? ParquetRowGroupPruneReason::BLOOM_FILTER
: ParquetRowGroupPruneReason::NONE;
}
Expand Down Expand Up @@ -486,8 +484,41 @@ segment_v2::ZoneMap to_column_predicate_statistics(const ParquetColumnStatistics
return predicate_statistics;
}

const ParquetColumnSchema* find_child_schema_by_field_id(const ParquetColumnSchema& column_schema,
int32_t field_id) {
const auto child_it = std::ranges::find_if(
column_schema.children, [&](const std::unique_ptr<ParquetColumnSchema>& child) {
return child != nullptr && child->field_id == field_id;
});
return child_it == column_schema.children.end() ? nullptr : child_it->get();
}

} // namespace

const ParquetColumnSchema* ParquetStatisticsUtils::ResolvePredicateLeafSchema(
const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema,
const reader::FileColumnPredicateFilter& column_filter) {
if (column_filter.file_column_id < 0 ||
column_filter.file_column_id >= static_cast<int>(schema.size())) {
return nullptr;
}
const ParquetColumnSchema* column_schema = schema[column_filter.file_column_id].get();
if (column_schema == nullptr) {
return nullptr;
}
for (const auto child_field_id : column_filter.file_child_id_path) {
column_schema = find_child_schema_by_field_id(*column_schema, child_field_id);
if (column_schema == nullptr) {
return nullptr;
}
}
if (column_schema->kind != ParquetColumnSchemaKind::PRIMITIVE ||
column_schema->leaf_column_id < 0 || column_schema->max_repetition_level > 0) {
return nullptr;
}
return column_schema;
}

ParquetColumnStatistics ParquetStatisticsUtils::TransformColumnStatistics(
const ParquetColumnSchema& column_schema,
const std::shared_ptr<::parquet::Statistics>& statistics) {
Expand Down Expand Up @@ -561,28 +592,26 @@ ParquetRowGroupPruneReason ParquetStatisticsUtils::RowGroupPruneReason(
if (column_filter.predicates.empty()) {
return ParquetRowGroupPruneReason::NONE;
}
DCHECK_LT(column_filter.file_column_id, schema.size());
const auto& column_schema = *schema[column_filter.file_column_id];
if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE ||
column_schema.leaf_column_id < 0) {
const auto* column_schema = ResolvePredicateLeafSchema(schema, column_filter);
if (column_schema == nullptr) {
return ParquetRowGroupPruneReason::NONE;
}
DCHECK_LT(column_schema.leaf_column_id, row_group.num_columns());
auto column_chunk = row_group.ColumnChunk(column_schema.leaf_column_id);
DCHECK_LT(column_schema->leaf_column_id, row_group.num_columns());
auto column_chunk = row_group.ColumnChunk(column_schema->leaf_column_id);
if (column_chunk == nullptr) {
return ParquetRowGroupPruneReason::NONE;
}
if (CheckStatistics(column_filter,
TransformColumnStatistics(column_schema, column_chunk->statistics()))) {
TransformColumnStatistics(*column_schema, column_chunk->statistics()))) {
return ParquetRowGroupPruneReason::STATISTICS;
}
if (!supports_dictionary_pruning(column_schema, *column_chunk, column_filter) ||
if (!supports_dictionary_pruning(*column_schema, *column_chunk, column_filter) ||
!is_dictionary_encoded_chunk(*column_chunk)) {
return ParquetRowGroupPruneReason::NONE;
}
OwnedDictionaryWords dict_words;
if (!read_dictionary_words(file_reader, row_group_idx, column_schema.leaf_column_id,
column_schema, &dict_words)) {
if (!read_dictionary_words(file_reader, row_group_idx, column_schema->leaf_column_id,
*column_schema, &dict_words)) {
return ParquetRowGroupPruneReason::NONE;
}
for (const auto& column_predicate : column_filter.predicates) {
Expand Down Expand Up @@ -883,19 +912,17 @@ bool select_ranges_for_filter(const std::shared_ptr<::parquet::RowGroupPageIndex
if (column_filter.predicates.empty()) {
return false;
}
DORIS_CHECK(column_filter.file_column_id >= 0);
DORIS_CHECK(column_filter.file_column_id < static_cast<int>(file_schema.size()));
const auto& column_schema = *file_schema[column_filter.file_column_id];
if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE ||
column_schema.descriptor == nullptr || column_schema.leaf_column_id < 0) {
const auto* column_schema =
ParquetStatisticsUtils::ResolvePredicateLeafSchema(file_schema, column_filter);
if (column_schema == nullptr || column_schema->descriptor == nullptr) {
return false;
}

std::shared_ptr<::parquet::ColumnIndex> column_index;
std::shared_ptr<::parquet::OffsetIndex> offset_index;
try {
column_index = row_group->GetColumnIndex(column_schema.leaf_column_id);
offset_index = row_group->GetOffsetIndex(column_schema.leaf_column_id);
column_index = row_group->GetColumnIndex(column_schema->leaf_column_id);
offset_index = row_group->GetOffsetIndex(column_schema->leaf_column_id);
} catch (const ::parquet::ParquetException&) {
return false;
} catch (const std::exception&) {
Expand All @@ -910,7 +937,7 @@ bool select_ranges_for_filter(const std::shared_ptr<::parquet::RowGroupPageIndex
const auto page_count = offset_index->page_locations().size();
for (size_t page_idx = 0; page_idx < page_count; ++page_idx) {
ParquetColumnStatistics page_statistics;
if (!build_page_statistics(column_index, column_schema, page_idx, &page_statistics)) {
if (!build_page_statistics(column_index, *column_schema, page_idx, &page_statistics)) {
ranges->clear();
return false;
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/format/new_parquet/parquet_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ struct ParquetColumnStatistics {
// 结构参考 DuckDB ParquetStatisticsUtils:先把 Parquet metadata 转成统一统计对象,
// 再由 filter/predicate 判断是否可以裁剪。这里不理解 table/global schema。
struct ParquetStatisticsUtils {
static const ParquetColumnSchema* ResolvePredicateLeafSchema(
const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema,
const reader::FileColumnPredicateFilter& column_filter);

static ParquetColumnStatistics TransformColumnStatistics(
const ParquetColumnSchema& column_schema,
const std::shared_ptr<::parquet::Statistics>& statistics);
Expand Down
21 changes: 21 additions & 0 deletions be/src/format/new_parquet/reader/arrow_leaf_reader_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,27 @@ Status read_nested_leaf_batch(const ArrowLeafReaderContext& context, int64_t bat
std::copy(rep_levels, rep_levels + batch->levels_written, batch->rep_levels.begin());
}

batch->value_indices.resize(static_cast<size_t>(batch->levels_written), -1);
int64_t value_idx = 0;
const bool dense_value_slots = values_written == batch->levels_written;
for (int64_t level_idx = 0; level_idx < batch->levels_written; ++level_idx) {
if (batch->def_levels[level_idx] < value_slot_definition_level ||
batch->rep_levels[level_idx] > value_slot_repetition_level) {
continue;
}
if (dense_value_slots) {
batch->value_indices[static_cast<size_t>(level_idx)] = level_idx;
} else {
if (value_idx >= values_written) {
return Status::Corruption(
"Nested parquet reader returned fewer values than definition levels for "
"column {}",
context.column_name());
}
batch->value_indices[static_cast<size_t>(level_idx)] = value_idx++;
}
}

const auto value_type = remove_nullable(context.data_type());
batch->values_column = value_type->create_column();
if (values_written > 0) {
Expand Down
35 changes: 13 additions & 22 deletions be/src/format/new_parquet/reader/nested_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct NestedScalarBatch {
int16_t value_slot_repetition_level = std::numeric_limits<int16_t>::max();
std::vector<int16_t> def_levels;
std::vector<int16_t> rep_levels;
std::vector<int64_t> value_indices;
MutableColumnPtr values_column;

bool empty() const { return levels_written == 0; }
Expand Down Expand Up @@ -137,25 +138,15 @@ class NestedScalarValueCursor {
void reset(const NestedScalarBatch* batch) {
DORIS_CHECK(batch != nullptr);
_batch = batch;
_next_level_idx = 0;
_next_value_idx = 0;
}

Status value_index(const std::string& column_name, int64_t level_idx, int64_t* value_idx) {
DORIS_CHECK(_batch != nullptr);
DORIS_CHECK(value_idx != nullptr);
DORIS_CHECK(level_idx >= _next_level_idx);
DORIS_CHECK(level_idx < _batch->levels_written);
int64_t computed_value_idx = -1;
while (_next_level_idx <= level_idx) {
if (has_value_slot(_next_level_idx)) {
if (_next_level_idx == level_idx) {
computed_value_idx = _next_value_idx;
}
++_next_value_idx;
}
++_next_level_idx;
}
DORIS_CHECK(level_idx >= 0);
DORIS_CHECK(static_cast<size_t>(level_idx) < _batch->value_indices.size());
const int64_t computed_value_idx = _batch->value_indices[static_cast<size_t>(level_idx)];
if (computed_value_idx < 0) {
return Status::Corruption("Nested parquet value is absent for column {}", column_name);
}
Expand All @@ -170,14 +161,13 @@ class NestedScalarValueCursor {

bool has_value_slot(int64_t level_idx) const {
DORIS_CHECK(_batch != nullptr);
return _batch->def_levels[level_idx] >= _batch->value_slot_definition_level &&
_batch->rep_levels[level_idx] <= _batch->value_slot_repetition_level;
DORIS_CHECK(level_idx >= 0);
DORIS_CHECK(static_cast<size_t>(level_idx) < _batch->value_indices.size());
return _batch->value_indices[static_cast<size_t>(level_idx)] >= 0;
}

private:
const NestedScalarBatch* _batch = nullptr;
int64_t _next_level_idx = 0;
int64_t _next_value_idx = 0;
};

inline void move_nested_scalar_tail(const NestedScalarBatch& src, int64_t start_level,
Expand All @@ -195,17 +185,18 @@ inline void move_nested_scalar_tail(const NestedScalarBatch& src, int64_t start_
dst.rep_levels.assign(src.rep_levels.begin() + start_level, src.rep_levels.end());
dst.value_slot_definition_level = src.value_slot_definition_level;
dst.value_slot_repetition_level = src.value_slot_repetition_level;
dst.value_indices.resize(static_cast<size_t>(dst.levels_written), -1);
dst.values_column = src.values_column->clone_empty();

NestedScalarValueCursor value_cursor(&src);
int64_t values_written = 0;
for (int64_t level_idx = start_level; level_idx < src.levels_written; ++level_idx) {
if (!value_cursor.has_value_slot(level_idx)) {
const int64_t value_idx = src.value_indices[static_cast<size_t>(level_idx)];
if (value_idx < 0) {
continue;
}
int64_t value_idx = -1;
auto status = value_cursor.value_index("overflow", level_idx, &value_idx);
DORIS_CHECK(status.ok());
dst.value_indices[static_cast<size_t>(level_idx - start_level)] = values_written;
dst.values_column->insert_from(*src.values_column, static_cast<size_t>(value_idx));
values_written++;
}
overflow->batch = std::move(dst);
}
Expand Down
Loading
Loading