diff --git a/be/src/format/new_parquet/parquet_reader.cpp b/be/src/format/new_parquet/parquet_reader.cpp index 043f155dd8588f..489e184cd2b889 100644 --- a/be/src/format/new_parquet/parquet_reader.cpp +++ b/be/src/format/new_parquet/parquet_reader.cpp @@ -327,18 +327,47 @@ Status ParquetReader::_execute_filter_conjuncts(int64_t batch_rows, Block* file_ // predicate columns in the file-local block have been materialized. for (const auto& expression_filter : _request->expression_filters) { if (expression_filter.conjunct == nullptr) { - continue; + if (expression_filter.delete_conjunct == nullptr) { + continue; + } + } else { + if (*selected_rows == 0) { + break; + } + IColumn::Filter filter(static_cast(batch_rows), 1); + bool can_filter_all = false; + RETURN_IF_ERROR(expression_filter.conjunct->execute_filter( + file_block, filter.data(), static_cast(batch_rows), false, + &can_filter_all)); + *selected_rows = can_filter_all ? 0 + : _apply_filter_to_selection(filter, selection, + *selected_rows); } if (*selected_rows == 0) { break; } - IColumn::Filter filter(static_cast(batch_rows), 1); - bool can_filter_all = false; - RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(file_block, filter.data(), - static_cast(batch_rows), - false, &can_filter_all)); + if (expression_filter.delete_conjunct == nullptr) { + continue; + } + int result_column_id = -1; + RETURN_IF_ERROR(expression_filter.delete_conjunct->root()->execute( + expression_filter.delete_conjunct.get(), file_block, &result_column_id)); + DORIS_CHECK(result_column_id >= 0 && + result_column_id < static_cast(file_block->columns())); + const auto& delete_filter = assert_cast( + *file_block->get_by_position(result_column_id).column) + .get_data(); + DORIS_CHECK(delete_filter.size() == static_cast(batch_rows)); + IColumn::Filter keep_filter(static_cast(batch_rows), 1); + bool has_kept_row = false; + for (size_t row = 0; row < static_cast(batch_rows); ++row) { + keep_filter[row] = !delete_filter[row]; + has_kept_row |= keep_filter[row] != 0; + } + file_block->erase(result_column_id); *selected_rows = - can_filter_all ? 0 : _apply_filter_to_selection(filter, selection, *selected_rows); + !has_kept_row ? 0 + : _apply_filter_to_selection(keep_filter, selection, *selected_rows); } return Status::OK(); } diff --git a/be/src/format/reader/expr/delete_predicate.cpp b/be/src/format/reader/expr/delete_predicate.cpp index 01844fa8a07069..31c6a057afd213 100644 --- a/be/src/format/reader/expr/delete_predicate.cpp +++ b/be/src/format/reader/expr/delete_predicate.cpp @@ -69,26 +69,26 @@ void DeletePredicate::close(VExprContext* context, FunctionContext::FunctionStat * Row IDs should be generated by file reader as a virtual column in `block`. **/ Status DeletePredicate::execute(VExprContext* context, Block* block, int* result_column_id) const { - if (block->empty()) { - return Status::OK(); - } - DCHECK(_open_finished || block == nullptr); if (_children.size() != 1) { return Status::InternalError(fmt::format( "DeletePredicate should have exactly 1 child expr, but got {}", _children.size())); } int slot = -1; RETURN_IF_ERROR(_children[0]->execute(context, block, &slot)); - const auto count = block->rows(); - auto res_col = ColumnBool::create(block->rows(), 0); const auto& row_ids = assert_cast(*block->get_by_position(slot).column).get_data(); - DCHECK_EQ(row_ids.size(), count); + const auto count = row_ids.size(); + auto res_col = ColumnBool::create(count, 0); if (_deleted_rows.empty()) { block->insert({std::move(res_col), std::make_shared(), expr_name()}); *result_column_id = static_cast(block->get_columns().size() - 1); return Status::OK(); } + if (count == 0) { + block->insert({std::move(res_col), std::make_shared(), expr_name()}); + *result_column_id = static_cast(block->get_columns().size() - 1); + return Status::OK(); + } const int64_t* delete_rows = _deleted_rows.data(); const int64_t* delete_rows_end = delete_rows + _deleted_rows.size(); const int64_t* start_pos = std::lower_bound(delete_rows, delete_rows_end, row_ids[0]); diff --git a/be/src/format/reader/table/paimon_reader.cpp b/be/src/format/reader/table/paimon_reader.cpp index 713d1a97e68983..d5c450b2c0172b 100644 --- a/be/src/format/reader/table/paimon_reader.cpp +++ b/be/src/format/reader/table/paimon_reader.cpp @@ -17,26 +17,39 @@ #include "format/reader/table/paimon_reader.h" +#include +#include + #include "format/table/deletion_vector_reader.h" namespace doris::paimon { -bool PaimonReader::_parse_delete_file(const TTableFormatFileDesc& t_desc, DeleteFileDesc& desc) { +Status PaimonReader::_parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, + DeleteFileDesc* desc, bool* has_delete_file) { + DORIS_CHECK(desc != nullptr); + DORIS_CHECK(has_delete_file != nullptr); + *has_delete_file = false; const auto& table_desc = t_desc.paimon_params; if (!table_desc.__isset.deletion_file) { - return false; + return Status::OK(); } const auto& deletion_file = table_desc.deletion_file; - desc.key.resize(deletion_file.path.size() + sizeof(deletion_file.offset)); - memcpy(desc.key.data(), deletion_file.path.data(), deletion_file.path.size()); - memcpy(desc.key.data() + deletion_file.path.size(), &deletion_file.offset, - sizeof(deletion_file.offset)); - desc.path = deletion_file.path; - desc.start_offset = deletion_file.offset; - desc.size = deletion_file.length + 4; - desc.file_size = -1; - return true; + const std::string key_prefix = "paimon_dv:"; + desc->key.resize(key_prefix.size() + deletion_file.path.size() + sizeof(deletion_file.offset)); + char* key_data = desc->key.data(); + memcpy(key_data, key_prefix.data(), key_prefix.size()); + key_data += key_prefix.size(); + memcpy(key_data, deletion_file.path.data(), deletion_file.path.size()); + key_data += deletion_file.path.size(); + memcpy(key_data, &deletion_file.offset, sizeof(deletion_file.offset)); + desc->path = deletion_file.path; + desc->start_offset = deletion_file.offset; + desc->size = deletion_file.length + 4; + desc->file_size = -1; + desc->format = DeleteFileDesc::Format::PAIMON; + *has_delete_file = true; + return Status::OK(); } } // namespace doris::paimon diff --git a/be/src/format/reader/table/paimon_reader.h b/be/src/format/reader/table/paimon_reader.h index d0f33c7a90c0b6..ce386460a6e681 100644 --- a/be/src/format/reader/table/paimon_reader.h +++ b/be/src/format/reader/table/paimon_reader.h @@ -30,7 +30,8 @@ class PaimonReader final : public reader::TableReader { ~PaimonReader() final = default; protected: - bool _parse_delete_file(const TTableFormatFileDesc& t_desc, DeleteFileDesc& desc) override; + Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, DeleteFileDesc* desc, + bool* has_delete_file) override; }; } // namespace doris::paimon diff --git a/be/src/format/reader/table_reader.cpp b/be/src/format/reader/table_reader.cpp index 86868b97b0bba3..0735cc51f383b7 100644 --- a/be/src/format/reader/table_reader.cpp +++ b/be/src/format/reader/table_reader.cpp @@ -20,16 +20,21 @@ #include #include +#include +#include #include #include +#include "common/cast_set.h" #include "common/status.h" #include "core/assert_cast.h" +#include "exec/common/endian.h" #include "exprs/vslot_ref.h" #include "format/new_parquet/parquet_reader.h" #include "format/reader/column_mapper.h" #include "format/table/deletion_vector_reader.h" #include "io/io_common.h" +#include "roaring/roaring64map.hh" namespace doris::reader { namespace { @@ -66,10 +71,63 @@ void build_table_filters_from_conjunct(const VExprSPtr& conjunct, table_filter.conjunct = VExprContext::create_shared(conjunct); table_filter.slot_ids.assign(slot_ids.begin(), slot_ids.end()); table_filters->push_back(std::move(table_filter)); - return; } } +Status parse_deletion_vector(const char* buf, size_t buffer_size, DeleteFileDesc::Format format, + DeleteRows* delete_rows) { + DORIS_CHECK(buf != nullptr); + DORIS_CHECK(delete_rows != nullptr); + DORIS_CHECK(format == DeleteFileDesc::Format::PAIMON || + format == DeleteFileDesc::Format::ICEBERG); + + const size_t checksum_size = format == DeleteFileDesc::Format::ICEBERG ? 4 : 0; + if (buffer_size < 8 + checksum_size) [[unlikely]] { + return Status::DataQualityError("Deletion vector file size too small: {}", buffer_size); + } + + auto total_length = BigEndian::Load32(buf); + if (total_length + 4 + checksum_size != buffer_size) [[unlikely]] { + return Status::DataQualityError("Deletion vector length mismatch, expected: {}, actual: {}", + total_length + 4 + checksum_size, buffer_size); + } + + constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'}; + if (memcmp(buf + sizeof(total_length), MAGIC_NUMBER, 4) != 0) [[unlikely]] { + return Status::DataQualityError("Deletion vector magic number mismatch"); + } + + const char* bitmap_buf = buf + 8; + const size_t bitmap_size = buffer_size - 8 - checksum_size; + if (format == DeleteFileDesc::Format::PAIMON) { + roaring::Roaring bitmap; + try { + bitmap = roaring::Roaring::readSafe(bitmap_buf, bitmap_size); + } catch (const std::runtime_error& e) { + return Status::DataQualityError("Decode roaring bitmap failed, {}", e.what()); + } + + delete_rows->reserve(bitmap.cardinality()); + for (auto it = bitmap.begin(); it != bitmap.end(); it++) { + delete_rows->push_back(*it); + } + return Status::OK(); + } + + roaring::Roaring64Map bitmap; + try { + bitmap = roaring::Roaring64Map::readSafe(bitmap_buf, bitmap_size); + } catch (const std::runtime_error& e) { + return Status::DataQualityError("Decode roaring bitmap failed, {}", e.what()); + } + + delete_rows->reserve(bitmap.cardinality()); + for (auto it = bitmap.begin(); it != bitmap.end(); it++) { + delete_rows->push_back(cast_set(*it)); + } + return Status::OK(); +} + } // namespace std::shared_ptr create_system_properties( @@ -117,10 +175,17 @@ Status TableReader::_open_local_filter_exprs(const FileScanRequest& file_request RowDescriptor row_desc; for (const auto& expression_filter : file_request.expression_filters) { if (expression_filter.conjunct == nullptr) { - continue; + if (expression_filter.delete_conjunct == nullptr) { + continue; + } + } else { + RETURN_IF_ERROR(expression_filter.conjunct->prepare(_runtime_state, row_desc)); + RETURN_IF_ERROR(expression_filter.conjunct->open(_runtime_state)); + } + if (expression_filter.delete_conjunct != nullptr) { + RETURN_IF_ERROR(expression_filter.delete_conjunct->prepare(_runtime_state, row_desc)); + RETURN_IF_ERROR(expression_filter.delete_conjunct->open(_runtime_state)); } - RETURN_IF_ERROR(expression_filter.conjunct->prepare(_runtime_state, row_desc)); - RETURN_IF_ERROR(expression_filter.conjunct->open(_runtime_state)); } return Status::OK(); } @@ -169,12 +234,17 @@ Status TableReader::prepare_split(const SplitReadOptions& options) { _partition_values = std::move(options.partition_values); _current_task = std::make_unique(); _current_task->data_file = create_file_description(options.current_range); + _delete_rows = nullptr; return _parse_delete_predicates(options); } Status TableReader::_parse_delete_predicates(const SplitReadOptions& options) { DeleteFileDesc desc {.fs_name = options.current_range.fs_name}; - if (_parse_delete_file(options.current_range.table_format_params, desc)) { + bool has_delete_file = false; + RETURN_IF_ERROR(_parse_deletion_vector_file(options.current_range.table_format_params, &desc, + &has_delete_file)); + if (has_delete_file) { + DORIS_CHECK(options.cache != nullptr); Status create_status = Status::OK(); _delete_rows = options.cache->get(desc.key, [&]() -> DeleteRows* { @@ -195,45 +265,11 @@ Status TableReader::_parse_delete_predicates(const SplitReadOptions& options) { } const char* buf = buffer.data(); - uint32_t actual_length; - std::memcpy(reinterpret_cast(&actual_length), buf, 4); - std::reverse(reinterpret_cast(&actual_length), - reinterpret_cast(&actual_length) + 4); - buf += 4; - if (actual_length != bytes_read - 4) [[unlikely]] { - create_status = Status::RuntimeError( - "DeletionVector deserialize error: length not match, " - "actual length: {}, expect length: {}", - actual_length, bytes_read - 4); - return nullptr; - } - uint32_t magic_number; - std::memcpy(reinterpret_cast(&magic_number), buf, 4); - std::reverse(reinterpret_cast(&magic_number), - reinterpret_cast(&magic_number) + 4); - buf += 4; - const static uint32_t MAGIC_NUMBER = 1581511376; - if (magic_number != MAGIC_NUMBER) [[unlikely]] { - create_status = Status::RuntimeError( - "DeletionVector deserialize error: invalid magic number {}", magic_number); - return nullptr; - } - - roaring::Roaring roaring_bitmap; SCOPED_TIMER(_profile->parse_delete_file_time); - try { - roaring_bitmap = roaring::Roaring::readSafe(buf, bytes_read - 4); - } catch (const std::runtime_error& e) { - create_status = Status::RuntimeError( - "DeletionVector deserialize error: failed to deserialize roaring bitmap, " - "{}", - e.what()); + create_status = parse_deletion_vector(buf, bytes_read, desc.format, delete_rows); + if (!create_status.ok()) [[unlikely]] { return nullptr; } - delete_rows->reserve(roaring_bitmap.cardinality()); - for (auto it = roaring_bitmap.begin(); it != roaring_bitmap.end(); it++) { - delete_rows->push_back(*it); - } COUNTER_UPDATE(_profile->num_delete_rows, delete_rows->size()); return delete_rows; }); diff --git a/be/src/format/reader/table_reader.h b/be/src/format/reader/table_reader.h index 5441995e18c35e..f94e98bd83798e 100644 --- a/be/src/format/reader/table_reader.h +++ b/be/src/format/reader/table_reader.h @@ -19,12 +19,14 @@ #include +#include #include #include #include #include #include +#include "common/cast_set.h" #include "common/status.h" #include "core/assert_cast.h" #include "core/block/block.h" @@ -32,11 +34,14 @@ #include "core/data_type/data_type_array.h" #include "core/data_type/data_type_map.h" #include "core/data_type/data_type_nullable.h" +#include "core/data_type/data_type_number.h" #include "core/data_type/data_type_struct.h" #include "exprs/vexpr_context.h" #include "exprs/vexpr_fwd.h" +#include "format/new_parquet/column_reader.h" #include "format/reader/column_mapper.h" #include "format/reader/expr/delete_predicate.h" +#include "format/reader/expr/slot_ref.h" #include "format/reader/file_reader.h" #include "runtime/descriptors.h" @@ -179,19 +184,9 @@ class TableReader { } continue; } - DCHECK_EQ(_data_reader.block_template.columns(), _data_reader.scan_schema.size()); - + DCHECK_EQ(_data_reader.block_template.columns(), _data_reader.block_schema.size()); DORIS_CHECK(block->columns() == _data_reader.column_mapper.mappings().size()); - size_t idx = 0; - for (const auto& mapping : _data_reader.column_mapper.mappings()) { - ColumnPtr column; - RETURN_IF_ERROR(_materialize_mapping_column(mapping, &_data_reader.block_template, - current_rows, &column)); - block->replace_by_position(idx, std::move(column)); - idx++; - } - RETURN_IF_ERROR(finalize_chunk(block)); - RETURN_IF_ERROR(materialize_virtual_columns(block)); + RETURN_IF_ERROR(finalize_chunk(block, current_rows)); if (current_eof) { RETURN_IF_ERROR(close_current_reader()); } @@ -209,9 +204,13 @@ class TableReader { } protected: - virtual bool _parse_delete_file(const TTableFormatFileDesc& t_desc, DeleteFileDesc& desc) { - return false; + // Parse deletion vector information from table format specific file description. + virtual Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, + DeleteFileDesc* desc, bool* has_delete_file) { + *has_delete_file = false; + return Status::OK(); } + // 切换到下一个 reader 的通用流程。 // 该方法先关闭当前 reader,再打开下一个具体 reader;子类不应重复实现这个循环。 Status create_next_reader(bool* eos); @@ -219,36 +218,45 @@ class TableReader { // 打开当前具体 reader。 // 子类在这里基于当前 split/task 初始化底层 FileReader。 virtual Status open_reader() { + // 1. Get file schema and create column mapping. std::vector file_schema; RETURN_IF_ERROR(_data_reader.reader->get_schema(&file_schema)); - _data_reader.block_schema = file_schema; + _data_reader.file_schema = file_schema; RETURN_IF_ERROR(_data_reader.column_mapper.create_mapping(_projected_columns, _partition_values, file_schema)); DORIS_CHECK(_data_reader.column_mapper.mappings().size() == _projected_columns.size()); + + // 2. Build table filters based on conjuncts and column predicates. RETURN_IF_ERROR(_build_table_filters_from_conjuncts()); + // 3. Create file scan request based on column mapping and table filters, then open file reader with the request. + // file scan request is the main carrier of file-level pruning information, including column mapping, column-level filters and expression filters. The file reader will evaluate the filters and only return rows that satisfy the filters to table reader. auto file_request = std::make_unique(); RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request( _table_filters, _table_column_predicates, _projected_columns, file_request.get())); RETURN_IF_ERROR(customize_file_scan_request(file_request.get())); RETURN_IF_ERROR(_open_local_filter_exprs(*file_request)); - _data_reader.scan_schema.clear(); + _data_reader.block_schema.clear(); _data_reader.block_template.clear(); - _data_reader.scan_schema.resize(file_request->column_positions.size()); + _data_reader.block_schema.resize(file_request->column_positions.size()); + + // 4. Build block schema based on file schema and column mapping. The scan schema describes the column layout of the block returned by file reader, which is determined by the column mapping and file schema. for (const auto& [file_column_id, block_position] : file_request->column_positions) { - DORIS_CHECK(block_position < _data_reader.scan_schema.size()); - const auto* field = _find_schema_field(_data_reader.block_schema, file_column_id); + DORIS_CHECK(block_position < _data_reader.block_schema.size()); + const auto* field = _find_schema_field(_data_reader.file_schema, file_column_id); DORIS_CHECK(field != nullptr); auto projection_it = file_request->complex_projections.find(file_column_id); if (projection_it == file_request->complex_projections.end()) { - _data_reader.scan_schema[block_position] = *field; + _data_reader.block_schema[block_position] = *field; } else { RETURN_IF_ERROR(_project_schema_field(*field, projection_it->second, - &_data_reader.scan_schema[block_position])); + &_data_reader.block_schema[block_position])); } } - _data_reader.block_template.reserve(_data_reader.scan_schema.size()); - for (const auto& field : _data_reader.scan_schema) { + + // 5. Prepare block template based on block schema. The block template is used to store the block returned by file reader before finalize; it has the same column layout as the file reader output block, which is determined by the column mapping and file schema. + _data_reader.block_template.reserve(_data_reader.block_schema.size()); + for (const auto& field : _data_reader.block_schema) { _data_reader.block_template.insert( {field.type->create_column(), field.type, field.name}); } @@ -261,6 +269,68 @@ class TableReader { Status _open_local_filter_exprs(const FileScanRequest& file_request); virtual Status customize_file_scan_request(FileScanRequest* file_request) { + return _append_delete_predicate(file_request); + } + + static size_t _next_block_position(const FileScanRequest& request) { + size_t next_position = 0; + for (const auto& [_, block_position] : request.column_positions) { + next_position = std::max(next_position, block_position + 1); + } + return next_position; + } + + void _append_file_scan_column(FileScanRequest* request, ColumnId column_id, + std::vector* scan_columns) { + DORIS_CHECK(request != nullptr); + DORIS_CHECK(scan_columns != nullptr); + if (scan_columns == &request->non_predicate_columns && + std::find(request->predicate_columns.begin(), request->predicate_columns.end(), + column_id) != request->predicate_columns.end()) { + return; + } + const bool newly_added = request->column_positions.count(column_id) == 0; + if (newly_added) { + request->column_positions.emplace(column_id, _next_block_position(*request)); + scan_columns->push_back(column_id); + } else if (std::find(scan_columns->begin(), scan_columns->end(), column_id) == + scan_columns->end()) { + scan_columns->push_back(column_id); + } + if (scan_columns == &request->predicate_columns) { + request->non_predicate_columns.erase( + std::remove(request->non_predicate_columns.begin(), + request->non_predicate_columns.end(), column_id), + request->non_predicate_columns.end()); + } + if (column_id == doris::parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID && + _find_schema_field(_data_reader.file_schema, column_id) == nullptr) { + _data_reader.file_schema.push_back( + doris::parquet::ParquetColumnReaderFactory::row_position_schema_field()); + } + } + + // Append DeletePredicate to file scan request if there are deletes. The predicate will be evaluated in file reader level and filter out deleted rows before returning data to table reader. + Status _append_delete_predicate(FileScanRequest* request) { + DORIS_CHECK(request != nullptr); + if (_delete_rows == nullptr || _delete_rows->empty()) { + return Status::OK(); + } + const auto row_position_column_id = + parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID; + _append_file_scan_column(request, row_position_column_id, &request->predicate_columns); + + auto delete_predicate = std::make_shared(*_delete_rows); + const auto block_position = request->column_positions.at(row_position_column_id); + delete_predicate->add_child(TableSlotRef::create_shared( + cast_set(block_position), cast_set(block_position), -1, + std::make_shared(), + parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_NAME)); + + FileExpressionFilter delete_filter; + delete_filter.delete_conjunct = VExprContext::create_shared(std::move(delete_predicate)); + delete_filter.file_column_ids.push_back(row_position_column_id); + request->expression_filters.push_back(std::move(delete_filter)); return Status::OK(); } @@ -272,27 +342,32 @@ class TableReader { _data_reader.column_mapper.clear(); _table_filters.clear(); _table_column_predicates.clear(); + _data_reader.file_schema.clear(); _data_reader.block_schema.clear(); - _data_reader.scan_schema.clear(); _data_reader.block_template.clear(); _current_task.reset(); return Status::OK(); } - // 将 file-local block 转换为 table/global schema block。 - // 这里执行 ColumnMapping 中的 finalize_expr、缺失列填充、partition/generated 列 - // 物化以及复杂列 remap。 - virtual Status finalize_chunk(Block* block) { return Status::OK(); } - - // 物化虚拟列。 - // 例如 _row_id、_last_updated_sequence_number 等,它们不来自文件物理列。 - virtual Status materialize_virtual_columns(Block* table_block) { - // 真实实现会物化 _row_id、_last_updated_sequence_number 等 Iceberg 虚拟列。 + // Finalize file-local block to table/global schema block. + virtual Status finalize_chunk(Block* block, const size_t rows) { + size_t idx = 0; + for (const auto& mapping : _data_reader.column_mapper.mappings()) { + ColumnPtr column; + RETURN_IF_ERROR(_materialize_mapping_column(mapping, &_data_reader.block_template, rows, + &column)); + block->replace_by_position(idx, std::move(column)); + idx++; + } + RETURN_IF_ERROR(materialize_virtual_columns(block)); return Status::OK(); } + // Materialize virtual columns in table block, such as _row_id and _last_updated_sequence_number in Iceberg. This is called after finalize_chunk, so the virtual column can be referenced in finalize_expr. + virtual Status materialize_virtual_columns(Block* table_block) { return Status::OK(); } + Status _materialize_mapping_column(const ColumnMapping& mapping, Block* current_block, - size_t current_rows, ColumnPtr* column) { + const size_t rows, ColumnPtr* column) { if (mapping.projection != nullptr) { int res_id; RETURN_IF_ERROR(mapping.projection->execute(current_block, &res_id)); @@ -300,23 +375,22 @@ class TableReader { return Status::OK(); } if (mapping.default_expr != nullptr) { - if (current_block->rows() == current_rows) { + if (current_block->rows() == rows) { int res_id; RETURN_IF_ERROR(mapping.default_expr->execute(current_block, &res_id)); *column = current_block->get_columns()[res_id]; } else { DORIS_CHECK(mapping.is_constant); Block eval_block; - eval_block.insert( - {mapping.table_type->create_column_const_with_default_value(current_rows), - mapping.table_type, "__table_reader_const_rows"}); + eval_block.insert({mapping.table_type->create_column_const_with_default_value(rows), + mapping.table_type, "__table_reader_const_rows"}); int res_id; RETURN_IF_ERROR(mapping.default_expr->execute(&eval_block, &res_id)); *column = eval_block.get_columns()[res_id]; } return Status::OK(); } - *column = mapping.table_type->create_column_const_with_default_value(current_rows); + *column = mapping.table_type->create_column_const_with_default_value(rows); return Status::OK(); } @@ -338,8 +412,10 @@ class TableReader { struct DataReader { std::unique_ptr reader; TableColumnMapper column_mapper; - std::vector block_schema; - std::vector scan_schema; + std::vector + file_schema; // Schema of the data file, also including virtual column (row position). + std::vector + block_schema; // Schema of the block returned by file reader, determined by column mapping and file schema. It is used for file reader to materialize columns into correct type and position. Block block_template; }; DataReader _data_reader; @@ -352,8 +428,8 @@ class TableReader { TableColumnPredicates _table_column_predicates; VExprContext _conjuncts {nullptr}; std::unique_ptr _profile; - // Parsed from DELETION_VECTOR in Iceberg and Paimon - DeleteRows* _delete_rows; + // Parsed from row-position based delete files, including position delete and deletion vector. + DeleteRows* _delete_rows = nullptr; TFileScanRangeParams* _scan_params; std::shared_ptr _io_ctx; RuntimeState* _runtime_state; @@ -451,6 +527,7 @@ class TableReader { return Status::OK(); } + // Parse row-position deletes from table format specific parameters, and fill in _delete_rows. Status _parse_delete_predicates(const SplitReadOptions& options); }; diff --git a/be/src/format/table/deletion_vector_reader.h b/be/src/format/table/deletion_vector_reader.h index b030f048415bf1..968344a8496bc7 100644 --- a/be/src/format/table/deletion_vector_reader.h +++ b/be/src/format/table/deletion_vector_reader.h @@ -37,6 +37,11 @@ struct IOContext; namespace doris { struct DeleteFileDesc { + enum class Format { + PAIMON, + ICEBERG, + }; + std::string key = ""; std::string path = ""; std::string fs_name = ""; @@ -44,6 +49,7 @@ struct DeleteFileDesc { int64_t size = 0; int64_t file_size = -1; int64_t modification_time = 0; + Format format = Format::PAIMON; }; class DeletionVectorReader { diff --git a/be/src/format/table/iceberg_reader_v2.cpp b/be/src/format/table/iceberg_reader_v2.cpp index 220f153e93fc67..ad72313cc89990 100644 --- a/be/src/format/table/iceberg_reader_v2.cpp +++ b/be/src/format/table/iceberg_reader_v2.cpp @@ -17,4 +17,398 @@ #include "format/table/iceberg_reader_v2.h" -namespace doris::iceberg {} // namespace doris::iceberg +#include +#include +#include +#include + +#include "core/assert_cast.h" +#include "core/block/block.h" +#include "core/column/column_const.h" +#include "core/column/column_nullable.h" +#include "core/column/column_string.h" +#include "core/column/column_vector.h" +#include "core/data_type/data_type_number.h" +#include "core/data_type/define_primitive_type.h" +#include "core/field.h" +#include "format/new_parquet/column_reader.h" +#include "format/new_parquet/parquet_reader.h" +#include "format/reader/table_reader.h" +#include "format/table/deletion_vector_reader.h" +#include "io/file_factory.h" + +namespace doris::iceberg { + +IcebergTableReader::PositionDeleteBlockCollector::PositionDeleteBlockCollector( + std::string data_file_path, std::map* rows) + : _data_file_path(std::move(data_file_path)), _rows(rows) {} + +Status IcebergTableReader::PositionDeleteBlockCollector::collect(const Block& block, + size_t read_rows) { + if (read_rows == 0) { + return Status::OK(); + } + const auto& file_path_column = assert_cast( + *block.get_by_position(ICEBERG_FILE_PATH_BLOCK_POSITION).column); + const auto& pos_column = + assert_cast(*block.get_by_position(ICEBERG_ROW_POS_BLOCK_POSITION) + .column); + for (size_t row = 0; row < read_rows; ++row) { + const auto file_path = file_path_column.get_data_at(row).to_string(); + if (file_path == _data_file_path) { + (*_rows)[file_path].push_back(pos_column.get_element(row)); + } + } + return Status::OK(); +} + +Status IcebergTableReader::prepare_split(const reader::SplitReadOptions& options) { + _row_lineage_columns = {}; + _iceberg_params = nullptr; + _delete_predicates_initialized = false; + _position_delete_rows_storage.clear(); + _equality_delete_files.clear(); + if (options.current_range.__isset.table_format_params && + options.current_range.table_format_params.__isset.iceberg_params) { + const auto& iceberg_params = options.current_range.table_format_params.iceberg_params; + _iceberg_params = &iceberg_params; + if (iceberg_params.__isset.first_row_id) { + _row_lineage_columns.first_row_id = iceberg_params.first_row_id; + } + if (iceberg_params.__isset.last_updated_sequence_number) { + _row_lineage_columns.last_updated_sequence_number = + iceberg_params.last_updated_sequence_number; + } + } + RETURN_IF_ERROR(TableReader::prepare_split(options)); + return _collect_position_delete_rows(options.current_range.table_format_params); +} + +Status IcebergTableReader::finalize_chunk(Block* block, const size_t rows) { + RETURN_IF_ERROR(reader::TableReader::finalize_chunk(block, rows)); + RETURN_IF_ERROR(apply_equality_deletes(block)); + return Status::OK(); +} + +Status IcebergTableReader::materialize_virtual_columns(Block* table_block) { + for (size_t column_idx = 0; column_idx < _data_reader.column_mapper.mappings().size(); + ++column_idx) { + const auto& mapping = _data_reader.column_mapper.mappings()[column_idx]; + switch (mapping.virtual_column_type) { + case reader::TableVirtualColumnType::ROW_ID: + RETURN_IF_ERROR(_materialize_row_lineage_row_id(table_block, column_idx)); + break; + case reader::TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER: + RETURN_IF_ERROR( + _materialize_row_lineage_last_updated_sequence_number(table_block, column_idx)); + break; + case reader::TableVirtualColumnType::INVALID: + break; + } + } + return Status::OK(); +} + +Status IcebergTableReader::customize_file_scan_request(reader::FileScanRequest* file_request) { + RETURN_IF_ERROR(TableReader::customize_file_scan_request(file_request)); + if (_row_lineage_columns.first_row_id >= 0 && _need_row_lineage_row_id()) { + RETURN_IF_ERROR(_append_row_position_output_column(file_request)); + } + return Status::OK(); +} + +Status IcebergTableReader::_parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, + DeleteFileDesc* desc, + bool* has_delete_file) { + DORIS_CHECK(desc != nullptr); + DORIS_CHECK(has_delete_file != nullptr); + *has_delete_file = false; + if (!t_desc.__isset.iceberg_params) { + return Status::OK(); + } + const auto& iceberg_params = t_desc.iceberg_params; + if (!iceberg_params.__isset.format_version || + iceberg_params.format_version < MIN_SUPPORT_DELETE_FILES_VERSION || + !iceberg_params.__isset.delete_files || iceberg_params.delete_files.empty()) { + return Status::OK(); + } + + const TIcebergDeleteFileDesc* deletion_vector = nullptr; + for (const auto& delete_file : iceberg_params.delete_files) { + if (!delete_file.__isset.content || delete_file.content != DELETION_VECTOR) { + continue; + } + if (deletion_vector != nullptr) { + return Status::DataQualityError("This iceberg data file has multiple DVs."); + } + deletion_vector = &delete_file; + } + if (deletion_vector == nullptr) { + return Status::OK(); + } + if (!deletion_vector->__isset.content_offset || + !deletion_vector->__isset.content_size_in_bytes) { + return Status::InternalError("Deletion vector is missing content offset or length"); + } + + desc->key = _iceberg_delete_vector_cache_key(*deletion_vector); + desc->path = deletion_vector->path; + desc->start_offset = deletion_vector->content_offset; + desc->size = deletion_vector->content_size_in_bytes; + desc->file_size = -1; + desc->format = DeleteFileDesc::Format::ICEBERG; + *has_delete_file = true; + return Status::OK(); +} + +Status IcebergTableReader::_collect_position_delete_rows(const TTableFormatFileDesc& t_desc) { + if (!t_desc.__isset.iceberg_params || _delete_predicates_initialized) { + _delete_predicates_initialized = true; + return Status::OK(); + } + const auto& iceberg_params = t_desc.iceberg_params; + if (!iceberg_params.__isset.format_version || + iceberg_params.format_version < MIN_SUPPORT_DELETE_FILES_VERSION || + !iceberg_params.__isset.delete_files || iceberg_params.delete_files.empty()) { + _delete_predicates_initialized = true; + return Status::OK(); + } + + std::vector position_delete_files; + for (const auto& delete_file : iceberg_params.delete_files) { + if (!delete_file.__isset.content) { + continue; + } + if (delete_file.content == POSITION_DELETE) { + position_delete_files.push_back(delete_file); + } else if (delete_file.content == EQUALITY_DELETE) { + _equality_delete_files.push_back(delete_file); + } + } + + if (_delete_rows != nullptr) { + _position_delete_rows_storage = *_delete_rows; + _delete_rows = &_position_delete_rows_storage; + } + if (!position_delete_files.empty()) { + RETURN_IF_ERROR(_read_position_delete_files(position_delete_files)); + } + + _delete_predicates_initialized = true; + return Status::OK(); +} + +Status IcebergTableReader::apply_equality_deletes(Block* block) { + if (!_equality_delete_files.empty()) { + return Status::NotSupported("Iceberg equality delete is not supported by TableReader"); + } + return Status::OK(); +} + +std::string IcebergTableReader::_iceberg_delete_vector_cache_key( + const TIcebergDeleteFileDesc& delete_file) { + const std::string key_prefix = "iceberg_dv:"; + std::string key; + key.resize(key_prefix.size() + delete_file.path.size() + sizeof(delete_file.content_offset) + + sizeof(delete_file.content_size_in_bytes)); + char* data = key.data(); + memcpy(data, key_prefix.data(), key_prefix.size()); + data += key_prefix.size(); + memcpy(data, delete_file.path.data(), delete_file.path.size()); + data += delete_file.path.size(); + memcpy(data, &delete_file.content_offset, sizeof(delete_file.content_offset)); + data += sizeof(delete_file.content_offset); + memcpy(data, &delete_file.content_size_in_bytes, sizeof(delete_file.content_size_in_bytes)); + return key; +} + +std::shared_ptr IcebergTableReader::_delete_file_system_properties( + const TFileScanRangeParams& scan_params) { + auto system_properties = std::make_shared(); + system_properties->system_type = + scan_params.__isset.file_type ? scan_params.file_type : TFileType::FILE_LOCAL; + system_properties->properties = scan_params.properties; + system_properties->hdfs_params = scan_params.hdfs_params; + if (scan_params.__isset.broker_addresses) { + system_properties->broker_addresses.assign(scan_params.broker_addresses.begin(), + scan_params.broker_addresses.end()); + } + return system_properties; +} + +std::unique_ptr IcebergTableReader::_delete_file_description( + const TFileRangeDesc& range) { + auto file_description = std::make_unique(); + file_description->path = range.path; + file_description->file_size = range.__isset.file_size ? range.file_size : -1; + file_description->range_start_offset = range.__isset.start_offset ? range.start_offset : 0; + file_description->range_size = range.__isset.size ? range.size : -1; + if (range.__isset.fs_name) { + file_description->fs_name = range.fs_name; + } + return file_description; +} + +const reader::SchemaField* IcebergTableReader::_find_delete_field( + const std::vector& schema, const std::string& name) { + for (const auto& field : schema) { + if (field.name == name) { + return &field; + } + } + return nullptr; +} + +Block IcebergTableReader::_build_position_delete_block(const reader::SchemaField& file_path_field, + const reader::SchemaField& pos_field) { + Block block; + block.insert({file_path_field.type->create_column(), file_path_field.type, ICEBERG_FILE_PATH}); + block.insert({pos_field.type->create_column(), pos_field.type, ICEBERG_ROW_POS}); + return block; +} + +Status IcebergTableReader::_append_row_position_output_column(reader::FileScanRequest* request) { + const auto row_position_column_id = + doris::parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID; + _append_file_scan_column(request, row_position_column_id, &request->non_predicate_columns); + _row_position_block_position = request->column_positions.at(row_position_column_id); + return Status::OK(); +} + +std::string IcebergTableReader::_data_file_path() const { + if (_iceberg_params != nullptr && _iceberg_params->__isset.original_file_path) { + return _iceberg_params->original_file_path; + } + DORIS_CHECK(_current_task != nullptr); + DORIS_CHECK(_current_task->data_file != nullptr); + return _current_task->data_file->path; +} + +Status IcebergTableReader::_read_parquet_position_delete_file( + const TIcebergDeleteFileDesc& delete_file, const TFileScanRangeParams& scan_params, + IcebergDeleteFileIOContext* delete_io_ctx, PositionDeleteBlockCollector* collector) { + if (!delete_file.__isset.file_format) { + return Status::InternalError("Iceberg position delete file is missing file format"); + } + if (delete_file.file_format == TFileFormatType::FORMAT_ORC) { + return Status::NotSupported("Iceberg ORC position delete file is not supported"); + } + if (delete_file.file_format != TFileFormatType::FORMAT_PARQUET) { + return Status::NotSupported("Unsupported Iceberg delete file format {}", + delete_file.file_format); + } + + auto delete_range = build_iceberg_delete_file_range(delete_file.path); + if (_current_task != nullptr && _current_task->data_file != nullptr && + !_current_task->data_file->fs_name.empty()) { + delete_range.__set_fs_name(_current_task->data_file->fs_name); + } + auto system_properties = _delete_file_system_properties(scan_params); + auto file_description = _delete_file_description(delete_range); + std::shared_ptr io_ctx(&delete_io_ctx->io_ctx, [](io::IOContext*) {}); + parquet::ParquetReader reader(system_properties, file_description, io_ctx, _scanner_profile); + RETURN_IF_ERROR(reader.init(_runtime_state)); + + std::vector schema; + RETURN_IF_ERROR(reader.get_schema(&schema)); + const auto* file_path_field = _find_delete_field(schema, ICEBERG_FILE_PATH); + const auto* pos_field = _find_delete_field(schema, ICEBERG_ROW_POS); + if (file_path_field == nullptr || pos_field == nullptr) { + return Status::InternalError("Position delete parquet file is missing required columns"); + } + + auto request = std::make_unique(); + request->non_predicate_columns = {file_path_field->id, pos_field->id}; + request->column_positions = { + {file_path_field->id, ICEBERG_FILE_PATH_BLOCK_POSITION}, + {pos_field->id, ICEBERG_ROW_POS_BLOCK_POSITION}, + }; + RETURN_IF_ERROR(reader.open(request)); + + bool eof = false; + while (!eof) { + Block block = _build_position_delete_block(*file_path_field, *pos_field); + size_t read_rows = 0; + RETURN_IF_ERROR(reader.get_block(&block, &read_rows, &eof)); + RETURN_IF_ERROR(collector->collect(block, read_rows)); + } + return reader.close(); +} + +Status IcebergTableReader::_read_position_delete_files( + const std::vector& delete_files) { + TFileScanRangeParams delete_scan_params = + _scan_params == nullptr ? TFileScanRangeParams() : *_scan_params; + std::map rows_by_file; + const auto data_file_path = _data_file_path(); + IcebergDeleteFileIOContext delete_io_ctx(_runtime_state); + PositionDeleteBlockCollector collector(data_file_path, &rows_by_file); + for (const auto& delete_file : delete_files) { + RETURN_IF_ERROR(_read_parquet_position_delete_file(delete_file, delete_scan_params, + &delete_io_ctx, &collector)); + } + auto rows_it = rows_by_file.find(data_file_path); + if (rows_it == rows_by_file.end()) { + return Status::OK(); + } + // Position delete files and deletion vectors both become row-position deletes for the + // common TableReader DeletePredicate path. Keep the merged rows in a member vector because + // DeletePredicate stores a reference to the vector used by _delete_rows. + _position_delete_rows_storage.insert(_position_delete_rows_storage.end(), + rows_it->second.begin(), rows_it->second.end()); + std::sort(_position_delete_rows_storage.begin(), _position_delete_rows_storage.end()); + _position_delete_rows_storage.erase(std::unique(_position_delete_rows_storage.begin(), + _position_delete_rows_storage.end()), + _position_delete_rows_storage.end()); + _delete_rows = &_position_delete_rows_storage; + return Status::OK(); +} + +Status IcebergTableReader::_materialize_row_lineage_row_id(Block* table_block, size_t column_idx) { + if (_row_lineage_columns.first_row_id < 0) { + return Status::OK(); + } + DORIS_CHECK(_row_position_block_position < _data_reader.block_template.columns()); + const auto& row_position_column = assert_cast( + *_data_reader.block_template.get_by_position(_row_position_block_position).column); + DORIS_CHECK(row_position_column.size() == table_block->rows()); + auto column = + table_block->get_by_position(column_idx).column->convert_to_full_column_if_const() + ->assume_mutable(); + auto* nullable_column = assert_cast(column.get()); + auto& null_map = nullable_column->get_null_map_data(); + auto& data = assert_cast(*nullable_column->get_nested_column_ptr()).get_data(); + null_map.resize(row_position_column.size()); + std::fill(null_map.begin(), null_map.end(), 0); + data.resize(row_position_column.size()); + for (size_t row = 0; row < row_position_column.size(); ++row) { + data[row] = _row_lineage_columns.first_row_id + row_position_column.get_element(row); + } + table_block->replace_by_position(column_idx, std::move(column)); + return Status::OK(); +} + +Status IcebergTableReader::_materialize_row_lineage_last_updated_sequence_number( + Block* table_block, size_t column_idx) { + if (_row_lineage_columns.last_updated_sequence_number < 0) { + return Status::OK(); + } + const auto rows = table_block->rows(); + auto data_column = table_block->get_by_position(column_idx).type->create_column(); + data_column->insert( + Field::create_field(_row_lineage_columns.last_updated_sequence_number)); + auto column = ColumnConst::create(std::move(data_column), rows); + table_block->replace_by_position(column_idx, std::move(column)); + return Status::OK(); +} + +bool IcebergTableReader::_need_row_lineage_row_id() const { + for (const auto& mapping : _data_reader.column_mapper.mappings()) { + if (mapping.virtual_column_type == reader::TableVirtualColumnType::ROW_ID) { + return true; + } + } + return false; +} + +} // namespace doris::iceberg diff --git a/be/src/format/table/iceberg_reader_v2.h b/be/src/format/table/iceberg_reader_v2.h index 6c6f4416717eb5..fbc8e28441b661 100644 --- a/be/src/format/table/iceberg_reader_v2.h +++ b/be/src/format/table/iceberg_reader_v2.h @@ -19,26 +19,24 @@ #include #include +#include #include #include -#include #include #include "common/status.h" -#include "core/assert_cast.h" -#include "core/block/block.h" -#include "core/column/column_const.h" -#include "core/column/column_nullable.h" -#include "core/column/column_vector.h" -#include "core/data_type/define_primitive_type.h" -#include "core/field.h" -#include "format/new_parquet/column_reader.h" #include "format/reader/file_reader.h" #include "format/reader/table_reader.h" +#include "format/table/iceberg_delete_file_reader_helper.h" #include "gen_cpp/PlanNodes_types.h" namespace doris { class Block; +struct DeleteFileDesc; +namespace io { +struct FileDescription; +struct FileSystemProperties; +} // namespace io } // namespace doris namespace doris::iceberg { @@ -50,144 +48,92 @@ class IcebergTableReader : public reader::TableReader { public: ~IcebergTableReader() override = default; - Status prepare_split(const reader::SplitReadOptions& options) override { - _row_lineage_columns = {}; - if (options.current_range.__isset.table_format_params && - options.current_range.table_format_params.__isset.iceberg_params) { - const auto& iceberg_params = options.current_range.table_format_params.iceberg_params; - if (iceberg_params.__isset.first_row_id) { - _row_lineage_columns.first_row_id = iceberg_params.first_row_id; - } - if (iceberg_params.__isset.last_updated_sequence_number) { - _row_lineage_columns.last_updated_sequence_number = - iceberg_params.last_updated_sequence_number; - } - } - return TableReader::prepare_split(options); - } + Status prepare_split(const reader::SplitReadOptions& options) override; protected: // 将 file-local block 转换为 table/global schema block。 // 这里执行 ColumnMapping 中的 finalize_expr、缺失列填充、partition/generated 列 // 物化以及复杂列 remap。 - Status finalize_chunk(Block* block) override { - // 真实实现会根据 ColumnMapping 执行 finalize_expr/default/partition/generated - // expressions,把 file-local block 写成 table block。 - RETURN_IF_ERROR(apply_equality_deletes(block)); - return Status::OK(); - } - - // 物化 Iceberg 虚拟列。 - // 例如 _row_id、_last_updated_sequence_number 等,它们不来自 Parquet 文件物理列。 - Status materialize_virtual_columns(Block* table_block) override { - for (size_t column_idx = 0; column_idx < _data_reader.column_mapper.mappings().size(); - ++column_idx) { - const auto& mapping = _data_reader.column_mapper.mappings()[column_idx]; - switch (mapping.virtual_column_type) { - case reader::TableVirtualColumnType::ROW_ID: - RETURN_IF_ERROR(_materialize_row_lineage_row_id(table_block, column_idx)); - break; - case reader::TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER: - RETURN_IF_ERROR(_materialize_row_lineage_last_updated_sequence_number(table_block, - column_idx)); - break; - case reader::TableVirtualColumnType::INVALID: - break; - } - } - return Status::OK(); - } - - // 将 Iceberg position delete / deletion vector 转换成底层 reader 可消费的删除信息。 - // 这一步发生在读取 data file 前,因此会修改 FileScanRequest。 - Status apply_position_deletes(reader::FileScanRequest* request) { - // 真实实现会把 position delete / deletion vector 转换成 file-local delete 信息。 - (void)request; - return Status::OK(); - } - - Status customize_file_scan_request(reader::FileScanRequest* file_request) override { - if (_row_lineage_columns.first_row_id < 0 || !_need_row_lineage_row_id()) { - return Status::OK(); - } - DORIS_CHECK(file_request != nullptr); - const auto row_position_column_id = - doris::parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID; - if (file_request->column_positions.count(row_position_column_id) > 0) { - return Status::OK(); - } - _row_position_block_position = file_request->column_positions.size(); - file_request->non_predicate_columns.push_back(row_position_column_id); - file_request->column_positions.emplace(row_position_column_id, - _row_position_block_position); - _data_reader.block_schema.push_back( - doris::parquet::ParquetColumnReaderFactory::row_position_schema_field()); - return Status::OK(); - } + Status finalize_chunk(Block* block, const size_t rows) override; + + Status materialize_virtual_columns(Block* table_block) override; + + Status customize_file_scan_request(reader::FileScanRequest* file_request) override; + + Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, DeleteFileDesc* desc, + bool* has_delete_file) override; + + Status _collect_position_delete_rows(const TTableFormatFileDesc& t_desc); // 在 table block 上应用 equality delete。 // equality delete 依赖 table-level 列语义,因此不能下沉到 ParquetReader。 - Status apply_equality_deletes(Block* block) { - // 真实实现会在 table block 上应用 equality delete。 - return Status::OK(); - } + Status apply_equality_deletes(Block* block); private: + static constexpr int MIN_SUPPORT_DELETE_FILES_VERSION = 2; + static constexpr int POSITION_DELETE = 1; + static constexpr int EQUALITY_DELETE = 2; + static constexpr int DELETION_VECTOR = 3; + struct RowLineageColumns { int64_t first_row_id = -1; int64_t last_updated_sequence_number = -1; }; - Status _materialize_row_lineage_row_id(Block* table_block, size_t column_idx) { - if (_row_lineage_columns.first_row_id < 0) { - return Status::OK(); - } - DORIS_CHECK(_row_position_block_position < _data_reader.block_template.columns()); - const auto& row_position_column = assert_cast( - *_data_reader.block_template.get_by_position(_row_position_block_position).column); - DORIS_CHECK(row_position_column.size() == table_block->rows()); - auto column = table_block->get_by_position(column_idx) - .column->convert_to_full_column_if_const() - ->assume_mutable(); - auto* nullable_column = assert_cast(column.get()); - auto& null_map = nullable_column->get_null_map_data(); - auto& data = - assert_cast(*nullable_column->get_nested_column_ptr()).get_data(); - null_map.resize(row_position_column.size()); - std::fill(null_map.begin(), null_map.end(), 0); - data.resize(row_position_column.size()); - for (size_t row = 0; row < row_position_column.size(); ++row) { - data[row] = _row_lineage_columns.first_row_id + row_position_column.get_element(row); - } - table_block->replace_by_position(column_idx, std::move(column)); - return Status::OK(); - } + static constexpr const char* ICEBERG_FILE_PATH = "file_path"; + static constexpr const char* ICEBERG_ROW_POS = "pos"; + static constexpr size_t ICEBERG_FILE_PATH_BLOCK_POSITION = 0; + static constexpr size_t ICEBERG_ROW_POS_BLOCK_POSITION = 1; + + class PositionDeleteBlockCollector final { + public: + PositionDeleteBlockCollector(std::string data_file_path, + std::map* rows); + + Status collect(const Block& block, size_t read_rows); + + private: + std::string _data_file_path; + std::map* _rows = nullptr; + }; + + static std::string _iceberg_delete_vector_cache_key(const TIcebergDeleteFileDesc& delete_file); + + static std::shared_ptr _delete_file_system_properties( + const TFileScanRangeParams& scan_params); + + static std::unique_ptr _delete_file_description(const TFileRangeDesc& range); + + static const reader::SchemaField* _find_delete_field( + const std::vector& schema, const std::string& name); + + static Block _build_position_delete_block(const reader::SchemaField& file_path_field, + const reader::SchemaField& pos_field); + + Status _append_row_position_output_column(reader::FileScanRequest* request); + + std::string _data_file_path() const; + + Status _read_parquet_position_delete_file(const TIcebergDeleteFileDesc& delete_file, + const TFileScanRangeParams& scan_params, + IcebergDeleteFileIOContext* delete_io_ctx, + PositionDeleteBlockCollector* collector); + + Status _read_position_delete_files(const std::vector& delete_files); + + Status _materialize_row_lineage_row_id(Block* table_block, size_t column_idx); Status _materialize_row_lineage_last_updated_sequence_number(Block* table_block, - size_t column_idx) { - if (_row_lineage_columns.last_updated_sequence_number < 0) { - return Status::OK(); - } - const auto rows = table_block->rows(); - auto data_column = table_block->get_by_position(column_idx).type->create_column(); - data_column->insert(Field::create_field( - _row_lineage_columns.last_updated_sequence_number)); - auto column = ColumnConst::create(std::move(data_column), rows); - table_block->replace_by_position(column_idx, std::move(column)); - return Status::OK(); - } + size_t column_idx); RowLineageColumns _row_lineage_columns; size_t _row_position_block_position = 0; + const TIcebergFileDesc* _iceberg_params = nullptr; + bool _delete_predicates_initialized = false; + reader::DeleteRows _position_delete_rows_storage; + std::vector _equality_delete_files; - bool _need_row_lineage_row_id() const { - for (const auto& mapping : _data_reader.column_mapper.mappings()) { - if (mapping.virtual_column_type == reader::TableVirtualColumnType::ROW_ID) { - return true; - } - } - return false; - } + bool _need_row_lineage_row_id() const; }; } // namespace doris::iceberg diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp b/be/test/format/new_parquet/parquet_reader_test.cpp index 00938482d6c3c0..0be12c271293cc 100644 --- a/be/test/format/new_parquet/parquet_reader_test.cpp +++ b/be/test/format/new_parquet/parquet_reader_test.cpp @@ -42,6 +42,8 @@ #include "exprs/vexpr_context.h" #include "format/new_parquet/column_reader.h" #include "format/reader/column_mapper.h" +#include "format/reader/expr/delete_predicate.h" +#include "format/reader/expr/slot_ref.h" #include "format/reader/file_reader.h" #include "format/reader/table_reader.h" #include "gen_cpp/Types_types.h" @@ -655,6 +657,98 @@ TEST_F(NewParquetReaderTest, RowPositionReaderKeepsPositionsAfterSelection) { EXPECT_EQ(row_position_column.get_element(2), 4); } +TEST_F(NewParquetReaderTest, DeletePredicateFiltersRowPositions) { + auto reader = create_reader(); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + Block block = build_file_block_with_row_position(schema); + + static const std::vector deleted_rows {1, 3}; + auto delete_predicate = std::make_shared(deleted_rows); + delete_predicate->add_child(TableSlotRef::create_shared( + 2, 2, -1, std::make_shared(), + parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_NAME)); + + auto request = std::make_unique(); + request->predicate_columns = {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID}; + request->non_predicate_columns = {0}; + request->column_positions = { + {0, 0}, + {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2}, + }; + reader::FileExpressionFilter delete_filter; + delete_filter.delete_conjunct = VExprContext::create_shared(std::move(delete_predicate)); + delete_filter.file_column_ids.push_back( + parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID); + request->expression_filters.push_back(std::move(delete_filter)); + ASSERT_TRUE(reader->open(request).ok()); + + size_t rows = 0; + bool eof = false; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + EXPECT_FALSE(eof); + ASSERT_EQ(rows, 3); + + const auto& id_column = assert_cast(*block.get_by_position(0).column); + const auto& row_position_column = + assert_cast(*block.get_by_position(2).column); + EXPECT_EQ(id_column.get_element(0), 1); + EXPECT_EQ(id_column.get_element(1), 3); + EXPECT_EQ(id_column.get_element(2), 5); + EXPECT_EQ(row_position_column.get_element(0), 0); + EXPECT_EQ(row_position_column.get_element(1), 2); + EXPECT_EQ(row_position_column.get_element(2), 4); +} + +TEST_F(NewParquetReaderTest, QueryPredicateAndDeletePredicateFilterRowPositions) { + auto reader = create_reader(); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + Block block = build_file_block_with_row_position(schema); + + static const std::vector deleted_rows {3}; + auto delete_predicate = std::make_shared(deleted_rows); + delete_predicate->add_child(TableSlotRef::create_shared( + 2, 2, -1, std::make_shared(), + parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_NAME)); + + auto request = std::make_unique(); + request->predicate_columns = {0, parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID}; + request->non_predicate_columns = {}; + request->column_positions = { + {0, 0}, + {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2}, + }; + reader::FileExpressionFilter expression_filter; + expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2); + expression_filter.delete_conjunct = VExprContext::create_shared(std::move(delete_predicate)); + expression_filter.file_column_ids.push_back(0); + expression_filter.file_column_ids.push_back( + parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID); + request->expression_filters.push_back(std::move(expression_filter)); + ASSERT_TRUE(reader->open(request).ok()); + + size_t rows = 0; + bool eof = false; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + EXPECT_FALSE(eof); + ASSERT_EQ(rows, 2); + + const auto& id_column = assert_cast(*block.get_by_position(0).column); + const auto& row_position_column = + assert_cast(*block.get_by_position(2).column); + EXPECT_EQ(id_column.get_element(0), 3); + EXPECT_EQ(id_column.get_element(1), 5); + EXPECT_EQ(row_position_column.get_element(0), 2); + EXPECT_EQ(row_position_column.get_element(1), 4); +} + TEST_F(NewParquetReaderTest, RowPositionReaderUsesFileLocalPositionsForScanRange) { write_parquet_file(_file_path, 2); auto parquet_file_reader = ::parquet::ParquetFileReader::OpenFile(_file_path, false); diff --git a/be/test/format/reader/table_reader_test.cpp b/be/test/format/reader/table_reader_test.cpp index dc050976836b93..8705775485f02f 100644 --- a/be/test/format/reader/table_reader_test.cpp +++ b/be/test/format/reader/table_reader_test.cpp @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -37,10 +38,16 @@ #include "core/data_type/data_type_nullable.h" #include "core/data_type/data_type_number.h" #include "core/data_type/data_type_string.h" +#include "exec/common/endian.h" #include "exprs/vexpr.h" +#include "format/format_common.h" #include "format/reader/expr/slot_ref.h" +#include "format/table/deletion_vector_reader.h" #include "format/table/iceberg_reader_v2.h" #include "gen_cpp/PlanNodes_types.h" +#include "io/io_common.h" +#include "roaring/roaring64map.hh" +#include "runtime/runtime_profile.h" #include "runtime/runtime_state.h" #include "storage/predicate/predicate_creator.h" @@ -80,6 +87,58 @@ class TableInt32GreaterThanExpr final : public VExpr { const std::string _expr_name = "TableInt32GreaterThanExpr"; }; +class IcebergTableReaderDeleteFileTestHelper final : public doris::iceberg::IcebergTableReader { +public: + Status parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, DeleteFileDesc* desc, + bool* has_delete_file) { + return _parse_deletion_vector_file(t_desc, desc, has_delete_file); + } +}; + +class IcebergTableReaderScanRequestTestHelper final : public doris::iceberg::IcebergTableReader { +public: + Status init_for_scan_request_test(std::vector projected_columns) { + _query_options = std::make_unique(); + _query_globals = std::make_unique(); + _state = std::make_unique(*_query_options, *_query_globals); + RETURN_IF_ERROR(init({ + .projected_columns = std::move(projected_columns), + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = nullptr, + .io_ctx = nullptr, + .runtime_state = _state.get(), + .scanner_profile = nullptr, + .allow_missing_columns = true, + .profile = nullptr, + })); + + SplitReadOptions split_options; + split_options.current_range.__set_path("scan-request-test.parquet"); + TTableFormatFileDesc table_format_params; + TIcebergFileDesc iceberg_params; + iceberg_params.__set_first_row_id(1000); + table_format_params.__set_iceberg_params(iceberg_params); + split_options.current_range.__set_table_format_params(table_format_params); + RETURN_IF_ERROR(prepare_split(split_options)); + + _delete_rows_storage = {1}; + _delete_rows = &_delete_rows_storage; + return Status::OK(); + } + + Status customize_request(FileScanRequest* request) { + return customize_file_scan_request(request); + } + +private: + std::unique_ptr _query_options; + std::unique_ptr _query_globals; + std::unique_ptr _state; + DeleteRows _delete_rows_storage; +}; + class TableInt32SumGreaterThanExpr final : public VExpr { public: TableInt32SumGreaterThanExpr(int left_slot_id, int left_column_id, int right_slot_id, @@ -174,6 +233,14 @@ std::shared_ptr build_int32_array(const std::vector& valu return finish_array(&builder); } +std::shared_ptr build_int64_array(const std::vector& values) { + arrow::Int64Builder builder; + for (const auto value : values) { + EXPECT_TRUE(builder.Append(value).ok()); + } + return finish_array(&builder); +} + std::shared_ptr build_string_array(const std::vector& values) { arrow::StringBuilder builder; for (const auto& value : values) { @@ -227,6 +294,54 @@ void write_int_pair_parquet_file(const std::string& file_path, const std::vector write_row_group_size, builder.build())); } +void write_position_delete_parquet_file(const std::string& file_path, + const std::vector& data_file_paths, + const std::vector& positions) { + auto schema = arrow::schema({ + arrow::field("file_path", arrow::utf8(), false), + arrow::field("pos", arrow::int64(), false), + }); + auto table = arrow::Table::Make(schema, + {build_string_array(data_file_paths), + build_int64_array(positions)}); + + auto file_result = arrow::io::FileOutputStream::Open(file_path); + ASSERT_TRUE(file_result.ok()) << file_result.status(); + std::shared_ptr out = *file_result; + + ::parquet::WriterProperties::Builder builder; + builder.version(::parquet::ParquetVersion::PARQUET_2_6); + builder.data_page_version(::parquet::ParquetDataPageVersion::V2); + builder.compression(::parquet::Compression::UNCOMPRESSED); + PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable( + *table, arrow::default_memory_pool(), out, static_cast(positions.size()), + builder.build())); +} + +int64_t write_iceberg_deletion_vector_file(const std::string& file_path, + const std::vector& deleted_positions) { + roaring::Roaring64Map rows; + for (const auto position : deleted_positions) { + rows.add(position); + } + + const size_t bitmap_size = rows.getSizeInBytes(); + std::vector blob(4 + 4 + bitmap_size + 4); + rows.write(blob.data() + 8); + + const uint32_t total_length = static_cast(4 + bitmap_size); + BigEndian::Store32(blob.data(), total_length); + constexpr char DV_MAGIC[] = {'\xD1', '\xD3', '\x39', '\x64'}; + memcpy(blob.data() + 4, DV_MAGIC, 4); + BigEndian::Store32(blob.data() + 8 + bitmap_size, 0); + + std::ofstream output(file_path, std::ios::binary); + EXPECT_TRUE(output.is_open()); + output.write(blob.data(), static_cast(blob.size())); + EXPECT_TRUE(output.good()); + return static_cast(blob.size()); +} + Block build_table_block(const std::vector& columns) { Block block; for (const auto& column : columns) { @@ -266,6 +381,81 @@ void set_iceberg_row_lineage_params(SplitReadOptions* split_options, int64_t fir split_options->current_range.__set_table_format_params(table_format_params); } +TIcebergDeleteFileDesc make_iceberg_deletion_vector(const std::string& path, int64_t offset, + int64_t size) { + TIcebergDeleteFileDesc delete_file; + delete_file.__set_content(3); + delete_file.__set_path(path); + delete_file.__set_content_offset(offset); + delete_file.__set_content_size_in_bytes(size); + return delete_file; +} + +TIcebergDeleteFileDesc make_iceberg_position_delete_file(const std::string& path) { + TIcebergDeleteFileDesc delete_file; + delete_file.__set_content(1); + delete_file.__set_path(path); + delete_file.__set_file_format(TFileFormatType::FORMAT_PARQUET); + return delete_file; +} + +TFileScanRangeParams make_local_parquet_scan_params() { + TFileScanRangeParams scan_params; + scan_params.__set_file_type(TFileType::FILE_LOCAL); + scan_params.__set_format_type(TFileFormatType::FORMAT_PARQUET); + return scan_params; +} + +std::shared_ptr make_io_context(io::FileReaderStats* file_reader_stats, + io::FileCacheStatistics* file_cache_stats) { + auto io_ctx = std::make_shared(); + io_ctx->file_reader_stats = file_reader_stats; + io_ctx->file_cache_stats = file_cache_stats; + return io_ctx; +} + +std::unique_ptr make_table_read_profile(RuntimeProfile* profile) { + auto read_profile = std::make_unique(); + read_profile->num_delete_files = ADD_COUNTER(profile, "NumDeleteFiles", TUnit::UNIT); + read_profile->num_delete_rows = ADD_COUNTER(profile, "NumDeleteRows", TUnit::UNIT); + read_profile->parse_delete_file_time = ADD_TIMER(profile, "ParseDeleteFileTime"); + return read_profile; +} + +TTableFormatFileDesc make_iceberg_table_format_desc( + const std::string& data_file_path, const std::vector& delete_files) { + TTableFormatFileDesc table_format_params; + TIcebergFileDesc iceberg_params; + iceberg_params.__set_format_version(2); + iceberg_params.__set_original_file_path(data_file_path); + iceberg_params.__set_delete_files(delete_files); + table_format_params.__set_iceberg_params(iceberg_params); + return table_format_params; +} + +std::vector read_iceberg_ids( + doris::iceberg::IcebergTableReader* reader, + const std::vector& projected_columns) { + std::vector ids; + bool eos = false; + while (!eos) { + Block block = build_table_block(projected_columns); + auto status = reader->get_block(&block, &eos); + if (!status.ok()) { + ADD_FAILURE() << status; + return ids; + } + if (block.rows() == 0) { + continue; + } + const auto& id_column = assert_cast(*block.get_by_position(0).column); + for (size_t row = 0; row < block.rows(); ++row) { + ids.push_back(id_column.get_element(row)); + } + } + return ids; +} + int64_t parquet_column_start_offset(const ::parquet::ColumnChunkMetaData& column_metadata) { return column_metadata.has_dictionary_page() ? static_cast(column_metadata.dictionary_page_offset()) @@ -936,6 +1126,217 @@ TEST(TableReaderTest, IcebergVirtualColumnsKeepRowLineageAfterRowGroupPredicateP std::filesystem::remove_all(test_dir); } +TEST(TableReaderTest, IcebergDeletionVectorUsesTableReaderDeleteFileInterface) { + TTableFormatFileDesc table_format_desc; + TIcebergFileDesc iceberg_desc; + iceberg_desc.__set_format_version(2); + iceberg_desc.__set_delete_files({make_iceberg_deletion_vector("dv.bin", 8, 128)}); + table_format_desc.__set_iceberg_params(iceberg_desc); + + IcebergTableReaderDeleteFileTestHelper reader; + DeleteFileDesc desc; + bool has_delete_file = false; + ASSERT_TRUE(reader.parse_deletion_vector_file(table_format_desc, &desc, &has_delete_file).ok()); + + EXPECT_TRUE(has_delete_file); + EXPECT_EQ(desc.path, "dv.bin"); + EXPECT_EQ(desc.start_offset, 8); + EXPECT_EQ(desc.size, 128); + EXPECT_EQ(desc.file_size, -1); + EXPECT_EQ(desc.format, DeleteFileDesc::Format::ICEBERG); +} + +TEST(TableReaderTest, IcebergDeletionVectorRejectsMultipleDeleteFiles) { + TTableFormatFileDesc table_format_desc; + TIcebergFileDesc iceberg_desc; + iceberg_desc.__set_format_version(2); + iceberg_desc.__set_delete_files({make_iceberg_deletion_vector("dv-a.bin", 8, 128), + make_iceberg_deletion_vector("dv-b.bin", 16, 256)}); + table_format_desc.__set_iceberg_params(iceberg_desc); + + IcebergTableReaderDeleteFileTestHelper reader; + DeleteFileDesc desc; + bool has_delete_file = false; + auto status = reader.parse_deletion_vector_file(table_format_desc, &desc, &has_delete_file); + + EXPECT_FALSE(status.ok()); +} + +TEST(TableReaderTest, IcebergTableReaderAppliesDeletionVectorFile) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_iceberg_deletion_vector_file_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + const auto dv_path = (test_dir / "delete-vector.bin").string(); + write_int_pair_parquet_file(file_path, {1, 2, 3, 4, 5}, {10, 20, 30, 40, 50}, + {"one", "two", "three", "four", "five"}); + const auto dv_size = write_iceberg_deletion_vector_file(dv_path, {0, 4}); + + std::vector projected_columns; + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + RuntimeProfile profile("test_profile"); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + auto scan_params = make_local_parquet_scan_params(); + io::FileReaderStats file_reader_stats; + io::FileCacheStatistics file_cache_stats; + auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); + ShardedKVCache cache(1); + doris::iceberg::IcebergTableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = &scan_params, + .io_ctx = io_ctx, + .runtime_state = &state, + .scanner_profile = &profile, + .allow_missing_columns = true, + .profile = make_table_read_profile(&profile), + }) + .ok()); + + auto split_options = build_split_options(file_path); + split_options.cache = &cache; + split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc( + file_path, {make_iceberg_deletion_vector(dv_path, 0, dv_size)})); + ASSERT_TRUE(reader.prepare_split(split_options).ok()); + + EXPECT_EQ(read_iceberg_ids(&reader, projected_columns), std::vector({2, 3, 4})); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, IcebergTableReaderAppliesPositionDeleteFile) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_iceberg_position_delete_file_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + const auto delete_file_path = (test_dir / "position-delete.parquet").string(); + write_int_pair_parquet_file(file_path, {1, 2, 3, 4, 5}, {10, 20, 30, 40, 50}, + {"one", "two", "three", "four", "five"}); + write_position_delete_parquet_file(delete_file_path, {file_path, file_path}, {1, 3}); + + std::vector projected_columns; + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + RuntimeProfile profile("test_profile"); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + auto scan_params = make_local_parquet_scan_params(); + io::FileReaderStats file_reader_stats; + io::FileCacheStatistics file_cache_stats; + auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); + ShardedKVCache cache(1); + doris::iceberg::IcebergTableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = &scan_params, + .io_ctx = io_ctx, + .runtime_state = &state, + .scanner_profile = &profile, + .allow_missing_columns = true, + .profile = make_table_read_profile(&profile), + }) + .ok()); + + auto split_options = build_split_options(file_path); + split_options.cache = &cache; + split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc( + file_path, {make_iceberg_position_delete_file(delete_file_path)})); + ASSERT_TRUE(reader.prepare_split(split_options).ok()); + + EXPECT_EQ(read_iceberg_ids(&reader, projected_columns), std::vector({1, 3, 5})); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, IcebergTableReaderMergesDeletionVectorAndPositionDeleteFiles) { + const auto test_dir = + std::filesystem::temp_directory_path() / "doris_iceberg_delete_files_merge_test"; + std::filesystem::remove_all(test_dir); + std::filesystem::create_directories(test_dir); + + const auto file_path = (test_dir / "split.parquet").string(); + const auto dv_path = (test_dir / "delete-vector.bin").string(); + const auto position_delete_path = (test_dir / "position-delete.parquet").string(); + write_int_pair_parquet_file(file_path, {1, 2, 3, 4, 5}, {10, 20, 30, 40, 50}, + {"one", "two", "three", "four", "five"}); + const auto dv_size = write_iceberg_deletion_vector_file(dv_path, {0}); + write_position_delete_parquet_file(position_delete_path, {file_path, file_path}, {3, 3}); + + std::vector projected_columns; + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + RuntimeProfile profile("test_profile"); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + auto scan_params = make_local_parquet_scan_params(); + io::FileReaderStats file_reader_stats; + io::FileCacheStatistics file_cache_stats; + auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); + ShardedKVCache cache(1); + doris::iceberg::IcebergTableReader reader; + ASSERT_TRUE(reader.init({ + .projected_columns = projected_columns, + .column_predicates = {}, + .conjuncts = VExprContext(nullptr), + .format = FileFormat::PARQUET, + .scan_params = &scan_params, + .io_ctx = io_ctx, + .runtime_state = &state, + .scanner_profile = &profile, + .allow_missing_columns = true, + .profile = make_table_read_profile(&profile), + }) + .ok()); + + auto split_options = build_split_options(file_path); + split_options.cache = &cache; + split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc( + file_path, {make_iceberg_deletion_vector(dv_path, 0, dv_size), + make_iceberg_position_delete_file(position_delete_path)})); + ASSERT_TRUE(reader.prepare_split(split_options).ok()); + + EXPECT_EQ(read_iceberg_ids(&reader, projected_columns), std::vector({2, 3, 5})); + + ASSERT_TRUE(reader.close().ok()); + std::filesystem::remove_all(test_dir); +} + +TEST(TableReaderTest, RowPositionDeletePredicateColumnIsNotRepeatedAsOutputColumn) { + const auto row_position_column_id = + doris::parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID; + std::vector projected_columns; + projected_columns.push_back( + make_table_column(100, "_row_id", make_nullable(std::make_shared()))); + projected_columns.push_back(make_table_column(0, "id", std::make_shared())); + + IcebergTableReaderScanRequestTestHelper reader; + ASSERT_TRUE(reader.init_for_scan_request_test(projected_columns).ok()); + + FileScanRequest request; + request.non_predicate_columns.push_back(0); + request.column_positions.emplace(0, 0); + + ASSERT_TRUE(reader.customize_request(&request).ok()); + + EXPECT_EQ(request.predicate_columns, std::vector({row_position_column_id})); + EXPECT_EQ(request.non_predicate_columns, std::vector({0})); + ASSERT_TRUE(request.column_positions.contains(row_position_column_id)); + EXPECT_EQ(request.column_positions.at(row_position_column_id), 1); + ASSERT_EQ(request.expression_filters.size(), 1); + EXPECT_NE(request.expression_filters[0].delete_conjunct, nullptr); +} + TEST(TableReaderTest, ParquetReaderReadsOnlyRowGroupsInFileRange) { const auto test_dir = std::filesystem::temp_directory_path() / "doris_table_reader_file_range_test";