diff --git a/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp b/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp index 8d1724a4396def..968ac987e9a852 100644 --- a/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp +++ b/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp @@ -19,6 +19,8 @@ #include +#include "format/table/iceberg/schema.h" +#include "format/table/iceberg/types.h" #include "format/transformer/vorc_transformer.h" #include "format/transformer/vparquet_transformer.h" #include "io/file_factory.h" @@ -26,6 +28,20 @@ namespace doris { +// Iceberg reserved field IDs for position delete files. +constexpr int POSITION_DELETE_FILE_PATH_ID = 2147483546; +constexpr int POSITION_DELETE_POS_ID = 2147483545; + +std::unique_ptr build_position_delete_schema() { + std::vector fields; + fields.reserve(2); + fields.emplace_back(false, POSITION_DELETE_FILE_PATH_ID, "file_path", + std::make_unique(), std::nullopt); + fields.emplace_back(false, POSITION_DELETE_POS_ID, "pos", std::make_unique(), + std::nullopt); + return std::make_unique(std::move(fields)); +} + VIcebergDeleteFileWriter::VIcebergDeleteFileWriter(TFileContent::type delete_type, const std::string& output_path, TFileFormatType::type file_format, @@ -46,6 +62,7 @@ Status VIcebergDeleteFileWriter::open(RuntimeState* state, RuntimeProfile* profi if (_delete_type != TFileContent::POSITION_DELETES) { return Status::NotSupported("Iceberg delete file writer only supports position deletes"); } + _position_delete_schema = build_position_delete_schema(); _state = state; @@ -83,15 +100,15 @@ Status VIcebergDeleteFileWriter::open(RuntimeState* state, RuntimeProfile* profi ParquetFileOptions parquet_options = {parquet_compression_type, TParquetVersion::PARQUET_1_0, false, false}; - _file_format_transformer.reset(new VParquetTransformer(state, _file_writer.get(), - output_exprs, column_names, false, - parquet_options, nullptr, nullptr)); + _file_format_transformer.reset(new VParquetTransformer( + state, _file_writer.get(), output_exprs, column_names, false, parquet_options, + nullptr, _position_delete_schema.get())); return _file_format_transformer->open(); } case TFileFormatType::FORMAT_ORC: { _file_format_transformer.reset(new VOrcTransformer(state, _file_writer.get(), output_exprs, "", column_names, false, _compress_type, - nullptr)); + _position_delete_schema.get())); return _file_format_transformer->open(); } default: diff --git a/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.h b/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.h index e5de7143f2b61c..c242731dc1548a 100644 --- a/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.h +++ b/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.h @@ -32,6 +32,9 @@ namespace doris { class RuntimeState; class RuntimeProfile; class ObjectPool; +namespace iceberg { +class Schema; +} namespace io { class FileSystem; @@ -103,6 +106,7 @@ class VIcebergDeleteFileWriter { RuntimeState* _state = nullptr; std::shared_ptr _fs; io::FileWriterPtr _file_writer; + std::unique_ptr _position_delete_schema; std::unique_ptr _file_format_transformer; int32_t _partition_spec_id = 0;