From 8c5032042a53d7b327165952a057dbbe73ad7677 Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Wed, 18 Mar 2026 14:27:11 +0800 Subject: [PATCH 1/4] implement insert to paimon table --- be/src/pipeline/exec/operator.cpp | 3 + .../exec/paimon_table_sink_operator.cpp | 34 ++ .../exec/paimon_table_sink_operator.h | 86 ++++ be/src/pipeline/pipeline_fragment_context.cpp | 9 + be/src/runtime/fragment_mgr.cpp | 21 + be/src/runtime/runtime_state.h | 13 + .../vec/sink/writer/async_result_writer.cpp | 4 + .../paimon/vpaimon_partition_writer.cpp | 185 ++++++++ .../writer/paimon/vpaimon_partition_writer.h | 100 +++++ .../writer/paimon/vpaimon_table_writer.cpp | 417 ++++++++++++++++++ .../sink/writer/paimon/vpaimon_table_writer.h | 102 +++++ .../paimon/PaimonExternalCatalog.java | 8 + .../datasource/paimon/PaimonTransaction.java | 233 ++++++++++ .../analyzer/UnboundPaimonTableSink.java | 84 ++++ .../analyzer/UnboundTableSinkCreator.java | 9 + .../translator/PhysicalPlanTranslator.java | 17 + .../apache/doris/nereids/rules/RuleSet.java | 2 + .../apache/doris/nereids/rules/RuleType.java | 2 + .../nereids/rules/analysis/BindSink.java | 60 +++ .../rules/expression/ExpressionRewrite.java | 9 + ...monTableSinkToPhysicalPaimonTableSink.java | 48 ++ .../doris/nereids/trees/plans/PlanType.java | 3 + .../insert/AbstractInsertExecutor.java | 1 + .../insert/InsertIntoTableCommand.java | 17 + .../plans/commands/insert/InsertUtils.java | 3 + .../insert/PaimonInsertCommandContext.java | 24 + .../commands/insert/PaimonInsertExecutor.java | 65 +++ .../plans/logical/LogicalPaimonTableSink.java | 149 +++++++ .../physical/PhysicalPaimonTableSink.java | 106 +++++ .../trees/plans/visitor/SinkVisitor.java | 15 + .../org/apache/doris/planner/DataSink.java | 3 + .../apache/doris/planner/PaimonTableSink.java | 145 ++++++ .../java/org/apache/doris/qe/Coordinator.java | 11 + .../doris/qe/runtime/LoadProcessor.java | 5 + .../transaction/PaimonTransactionManager.java | 33 ++ .../TransactionManagerFactory.java | 5 + .../doris/transaction/TransactionType.java | 3 +- gensrc/thrift/DataSinks.thrift | 24 + gensrc/thrift/FrontendService.thrift | 2 + 39 files changed, 2059 insertions(+), 1 deletion(-) create mode 100644 be/src/pipeline/exec/paimon_table_sink_operator.cpp create mode 100644 be/src/pipeline/exec/paimon_table_sink_operator.h create mode 100644 be/src/vec/sink/writer/paimon/vpaimon_partition_writer.cpp create mode 100644 be/src/vec/sink/writer/paimon/vpaimon_partition_writer.h create mode 100644 be/src/vec/sink/writer/paimon/vpaimon_table_writer.cpp create mode 100644 be/src/vec/sink/writer/paimon/vpaimon_table_writer.h create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonTransaction.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundPaimonTableSink.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPaimonTableSinkToPhysicalPaimonTableSink.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/PaimonInsertCommandContext.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/PaimonInsertExecutor.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalPaimonTableSink.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPaimonTableSink.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/PaimonTableSink.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/transaction/PaimonTransactionManager.java diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 6019325307139b..0b0ff3dc6cdb2f 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -41,6 +41,7 @@ #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/hive_table_sink_operator.h" #include "pipeline/exec/iceberg_table_sink_operator.h" +#include "pipeline/exec/paimon_table_sink_operator.h" #include "pipeline/exec/jdbc_scan_operator.h" #include "pipeline/exec/jdbc_table_sink_operator.h" #include "pipeline/exec/local_merge_sort_source_operator.h" @@ -820,6 +821,7 @@ DECLARE_OPERATOR(ResultFileSinkLocalState) DECLARE_OPERATOR(OlapTableSinkLocalState) DECLARE_OPERATOR(OlapTableSinkV2LocalState) DECLARE_OPERATOR(HiveTableSinkLocalState) +DECLARE_OPERATOR(PaimonTableSinkLocalState) DECLARE_OPERATOR(TVFTableSinkLocalState) DECLARE_OPERATOR(IcebergTableSinkLocalState) DECLARE_OPERATOR(SpillIcebergTableSinkLocalState) @@ -941,6 +943,7 @@ template class AsyncWriterSink; template class AsyncWriterSink; template class AsyncWriterSink; +template class AsyncWriterSink; template class AsyncWriterSink; template class AsyncWriterSink; diff --git a/be/src/pipeline/exec/paimon_table_sink_operator.cpp b/be/src/pipeline/exec/paimon_table_sink_operator.cpp new file mode 100644 index 00000000000000..6299cbcd1de801 --- /dev/null +++ b/be/src/pipeline/exec/paimon_table_sink_operator.cpp @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "paimon_table_sink_operator.h" + +#include "common/status.h" + +namespace doris::pipeline { +#include "common/compile_check_begin.h" + +Status PaimonTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_init_timer); + auto& p = _parent->cast(); + RETURN_IF_ERROR(_writer->init_properties(p._pool)); + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/paimon_table_sink_operator.h b/be/src/pipeline/exec/paimon_table_sink_operator.h new file mode 100644 index 00000000000000..f6b48c7c43b4ad --- /dev/null +++ b/be/src/pipeline/exec/paimon_table_sink_operator.h @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" +#include "vec/sink/writer/paimon/vpaimon_table_writer.h" + +namespace doris::pipeline { +#include "common/compile_check_begin.h" + +class PaimonTableSinkOperatorX; + +class PaimonTableSinkLocalState final + : public AsyncWriterSink { +public: + using Base = AsyncWriterSink; + using Parent = PaimonTableSinkOperatorX; + ENABLE_FACTORY_CREATOR(PaimonTableSinkLocalState); + PaimonTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {} + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + return Base::open(state); + } + friend class PaimonTableSinkOperatorX; +}; + +class PaimonTableSinkOperatorX final : public DataSinkOperatorX { +public: + using Base = DataSinkOperatorX; + PaimonTableSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc, + const std::vector& t_output_expr) + : Base(operator_id, 0, 0), + _row_desc(row_desc), + _t_output_expr(t_output_expr), + _pool(pool) {} + + Status init(const TDataSink& thrift_sink) override { + RETURN_IF_ERROR(Base::init(thrift_sink)); + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); + return Status::OK(); + } + + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); + return vectorized::VExpr::open(_output_vexpr_ctxs, state); + } + + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override { + auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state.exec_time_counter()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + return local_state.sink(state, in_block, eos); + } + +private: + friend class PaimonTableSinkLocalState; + template + requires(std::is_base_of_v) + friend class AsyncWriterSink; + const RowDescriptor& _row_desc; + vectorized::VExprContextSPtrs _output_vexpr_ctxs; + const std::vector& _t_output_expr; + ObjectPool* _pool = nullptr; +}; + +#include "common/compile_check_end.h" +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 79ef2dbd9b1e16..5932a771d5de99 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -63,6 +63,7 @@ #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/hive_table_sink_operator.h" #include "pipeline/exec/iceberg_table_sink_operator.h" +#include "pipeline/exec/paimon_table_sink_operator.h" #include "pipeline/exec/jdbc_scan_operator.h" #include "pipeline/exec/jdbc_table_sink_operator.h" #include "pipeline/exec/local_merge_sort_source_operator.h" @@ -1094,6 +1095,14 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS } break; } + case TDataSinkType::PAIMON_TABLE_SINK: { + if (!thrift_sink.__isset.paimon_table_sink) { + return Status::InternalError("Missing paimon table sink."); + } + _sink = std::make_shared(pool, next_sink_operator_id(), row_desc, + output_exprs); + break; + } case TDataSinkType::MAXCOMPUTE_TABLE_SINK: { if (!thrift_sink.__isset.max_compute_table_sink) { return Status::InternalError("Missing max compute table sink."); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 49f7849187c8d8..f3adcbf06b962e 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -566,6 +566,27 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { } } + LOG(INFO) << "coordinator_callback paimon: done=" << req.done + << ", fragment_state_pcd_size=" << req.runtime_state->paimon_commit_datas().size() + << ", runtime_states_count=" << req.runtime_states.size(); + if (auto pcd = req.runtime_state->paimon_commit_datas(); !pcd.empty()) { + params.__isset.paimon_commit_datas = true; + params.paimon_commit_datas.insert(params.paimon_commit_datas.end(), pcd.begin(), + pcd.end()); + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + auto rs_pcd = rs->paimon_commit_datas(); + LOG(INFO) << "coordinator_callback paimon task_state: pcd_size=" << rs_pcd.size(); + if (!rs_pcd.empty()) { + params.__isset.paimon_commit_datas = true; + params.paimon_commit_datas.insert(params.paimon_commit_datas.end(), + rs_pcd.begin(), rs_pcd.end()); + } + } + } + LOG(INFO) << "coordinator_callback paimon result: isset=" << params.__isset.paimon_commit_datas + << ", count=" << params.paimon_commit_datas.size(); + // Send new errors to coordinator req.runtime_state->get_unreported_errors(&(params.error_log)); params.__isset.error_log = (!params.error_log.empty()); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 126c8f4f6173e6..920b88a41f2e15 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -521,6 +521,16 @@ class RuntimeState { _mc_commit_datas.emplace_back(mc_commit_data); } + std::vector paimon_commit_datas() const { + std::lock_guard lock(_paimon_commit_datas_mutex); + return _paimon_commit_datas; + } + + void add_paimon_commit_datas(const TPaimonCommitData& paimon_commit_data) { + std::lock_guard lock(_paimon_commit_datas_mutex); + _paimon_commit_datas.emplace_back(paimon_commit_data); + } + // local runtime filter mgr, the runtime filter do not have remote target or // not need local merge should regist here. the instance exec finish, the local // runtime filter mgr can release the memory of local runtime filter @@ -881,6 +891,9 @@ class RuntimeState { mutable std::mutex _mc_commit_datas_mutex; std::vector _mc_commit_datas; + mutable std::mutex _paimon_commit_datas_mutex; + std::vector _paimon_commit_datas; + std::vector> _op_id_to_local_state; std::unique_ptr _sink_local_state; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 21421b34a60bc9..4621354662ceb4 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -204,7 +204,11 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* opera Status st = Status::OK(); { st = _writer_status.status(); } + LOG(INFO) << "AsyncResultWriter::process_block - before close: st.ok()=" << st.ok() + << ", st=" << st.to_string(); Status close_st = close(st); + LOG(INFO) << "AsyncResultWriter::process_block - after close: close_st.ok()=" << close_st.ok() + << ", close_st=" << close_st.to_string(); { // If it is already failed before, then not update the write status so that we could get // the real reason. diff --git a/be/src/vec/sink/writer/paimon/vpaimon_partition_writer.cpp b/be/src/vec/sink/writer/paimon/vpaimon_partition_writer.cpp new file mode 100644 index 00000000000000..89fc52a9f3ad0d --- /dev/null +++ b/be/src/vec/sink/writer/paimon/vpaimon_partition_writer.cpp @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vpaimon_partition_writer.h" + +#include "io/file_factory.h" +#include "runtime/runtime_state.h" +#include "vec/runtime/vorc_transformer.h" +#include "vec/runtime/vparquet_transformer.h" + +namespace doris { +namespace vectorized { + +VPaimonPartitionWriter::VPaimonPartitionWriter( + std::vector partition_values, + const VExprContextSPtrs& write_output_expr_ctxs, + std::vector write_column_names, WriteInfo write_info, std::string file_name, + int file_name_index, TFileFormatType::type file_format_type, + TFileCompressType::type compress_type, + const std::map& hadoop_conf) + : _partition_values(std::move(partition_values)), + _write_output_expr_ctxs(write_output_expr_ctxs), + _write_column_names(std::move(write_column_names)), + _write_info(std::move(write_info)), + _file_name(std::move(file_name)), + _file_name_index(file_name_index), + _file_format_type(file_format_type), + _compress_type(compress_type), + _hadoop_conf(hadoop_conf) {} + +Status VPaimonPartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) { + _state = state; + + io::FSPropertiesRef fs_properties(_write_info.file_type); + fs_properties.properties = &_hadoop_conf; + if (!_write_info.broker_addresses.empty()) { + fs_properties.broker_addresses = &(_write_info.broker_addresses); + } + // Files go into bucket-0 subdirectory + std::string target_file = fmt::format("{}/bucket-0/{}", _write_info.write_path, + _get_target_file_name()); + io::FileDescription file_description = {.path = target_file, .fs_name {}}; + _fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description)); + io::FileWriterOptions file_writer_options = {.used_by_s3_committer = false}; + RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer, &file_writer_options)); + + switch (_file_format_type) { + case TFileFormatType::FORMAT_PARQUET: { + TParquetCompressionType::type parquet_compression_type; + switch (_compress_type) { + case TFileCompressType::PLAIN: + parquet_compression_type = TParquetCompressionType::UNCOMPRESSED; + break; + case TFileCompressType::SNAPPYBLOCK: + parquet_compression_type = TParquetCompressionType::SNAPPY; + break; + case TFileCompressType::ZSTD: + parquet_compression_type = TParquetCompressionType::ZSTD; + break; + default: + return Status::InternalError("Unsupported compress type {} with parquet", + to_string(_compress_type)); + } + ParquetFileOptions parquet_options = {.compression_type = parquet_compression_type, + .parquet_version = TParquetVersion::PARQUET_1_0, + .parquet_disable_dictionary = false, + .enable_int96_timestamps = false}; + _file_format_transformer = std::make_unique( + state, _file_writer.get(), _write_output_expr_ctxs, _write_column_names, false, + parquet_options, nullptr, nullptr); + return _file_format_transformer->open(); + } + case TFileFormatType::FORMAT_ORC: { + _file_format_transformer = std::make_unique( + state, _file_writer.get(), _write_output_expr_ctxs, "", _write_column_names, false, + _compress_type, nullptr, _fs); + return _file_format_transformer->open(); + } + default: + return Status::InternalError("Unsupported file format type {}", to_string(_file_format_type)); + } +} + +Status VPaimonPartitionWriter::write(vectorized::Block& block) { + RETURN_IF_ERROR(_file_format_transformer->write(block)); + _row_count += block.rows(); + return Status::OK(); +} + +Status VPaimonPartitionWriter::close(const Status& status) { + Status result_status; + if (_file_format_transformer != nullptr) { + result_status = _file_format_transformer->close(); + if (!result_status.ok()) { + LOG(WARNING) << fmt::format("paimon partition writer close failed: {}", + result_status.to_string()); + } + } + bool status_ok = result_status.ok() && status.ok(); + LOG(INFO) << fmt::format("VPaimonPartitionWriter::close - result_status.ok()={}, status.ok()={}, status_ok={}, row_count={}, file={}", + result_status.ok(), status.ok(), status_ok, _row_count, + _get_target_file_name()); + if (!status_ok && _fs != nullptr) { + auto path = fmt::format("{}/bucket-0/{}", _write_info.write_path, _get_target_file_name()); + Status st = _fs->delete_file(path); + if (!st.ok()) { + LOG(WARNING) << fmt::format("Delete paimon file {} failed: {}", path, st.to_string()); + } + } + if (status_ok) { + TPaimonCommitData commit_data; + _build_paimon_commit_data(&commit_data); + _state->add_paimon_commit_datas(commit_data); + LOG(INFO) << fmt::format("Added paimon commit data: file_path={}, row_count={}, file_size={}", + commit_data.file_path, commit_data.row_count, commit_data.file_size); + } else { + LOG(WARNING) << fmt::format("Did NOT add paimon commit data due to status_ok=false"); + } + return result_status; +} + +void VPaimonPartitionWriter::_build_paimon_commit_data(TPaimonCommitData* commit_data) { + DCHECK(commit_data != nullptr); + DCHECK(_file_format_transformer != nullptr); + // Full path: original_write_path/bucket-0/filename + commit_data->__set_file_path(fmt::format("{}/bucket-0/{}", _write_info.original_write_path, + _get_target_file_name())); + commit_data->__set_row_count(_row_count); + commit_data->__set_file_size(_file_format_transformer->written_len()); + commit_data->__set_partition_values(_partition_values); +} + +std::string VPaimonPartitionWriter::_get_file_extension(TFileFormatType::type file_format_type, + TFileCompressType::type compress_type) { + std::string compress_name; + switch (compress_type) { + case TFileCompressType::SNAPPYBLOCK: + compress_name = ".snappy"; + break; + case TFileCompressType::ZSTD: + compress_name = ".zstd"; + break; + case TFileCompressType::ZLIB: + compress_name = ".zlib"; + break; + default: + compress_name = ""; + break; + } + std::string format_name; + switch (file_format_type) { + case TFileFormatType::FORMAT_PARQUET: + format_name = ".parquet"; + break; + case TFileFormatType::FORMAT_ORC: + format_name = ".orc"; + break; + default: + format_name = ""; + break; + } + return fmt::format("{}{}", compress_name, format_name); +} + +std::string VPaimonPartitionWriter::_get_target_file_name() { + return fmt::format("data-{}-{}{}", _file_name, _file_name_index, + _get_file_extension(_file_format_type, _compress_type)); +} + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/sink/writer/paimon/vpaimon_partition_writer.h b/be/src/vec/sink/writer/paimon/vpaimon_partition_writer.h new file mode 100644 index 00000000000000..99d363113af6f7 --- /dev/null +++ b/be/src/vec/sink/writer/paimon/vpaimon_partition_writer.h @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "io/fs/file_writer.h" +#include "vec/exprs/vexpr_fwd.h" +#include "vec/runtime/vfile_format_transformer.h" + +namespace doris { +namespace io { +class FileSystem; +} + +class ObjectPool; +class RuntimeState; +class RuntimeProfile; + +namespace vectorized { + +class Block; +class VFileFormatTransformer; + +class VPaimonPartitionWriter { +public: + struct WriteInfo { + std::string write_path; + std::string original_write_path; + TFileType::type file_type; + std::vector broker_addresses; + }; + + VPaimonPartitionWriter(std::vector partition_values, + const VExprContextSPtrs& write_output_expr_ctxs, + std::vector write_column_names, WriteInfo write_info, + std::string file_name, int file_name_index, + TFileFormatType::type file_format_type, + TFileCompressType::type compress_type, + const std::map& hadoop_conf); + + Status open(RuntimeState* state, RuntimeProfile* profile); + + Status write(vectorized::Block& block); + + Status close(const Status& status); + + inline const std::string& file_name() const { return _file_name; } + + inline int file_name_index() const { return _file_name_index; } + + inline size_t written_len() const { + return _file_format_transformer ? _file_format_transformer->written_len() : 0; + } + +private: + std::string _get_target_file_name(); + + std::string _get_file_extension(TFileFormatType::type file_format_type, + TFileCompressType::type compress_type); + + void _build_paimon_commit_data(TPaimonCommitData* commit_data); + + std::vector _partition_values; + size_t _row_count = 0; + + const VExprContextSPtrs& _write_output_expr_ctxs; + std::vector _write_column_names; + + WriteInfo _write_info; + std::string _file_name; + int _file_name_index; + TFileFormatType::type _file_format_type; + TFileCompressType::type _compress_type; + const std::map& _hadoop_conf; + + std::shared_ptr _fs = nullptr; + std::unique_ptr _file_writer = nullptr; + std::unique_ptr _file_format_transformer = nullptr; + + RuntimeState* _state = nullptr; +}; + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/sink/writer/paimon/vpaimon_table_writer.cpp b/be/src/vec/sink/writer/paimon/vpaimon_table_writer.cpp new file mode 100644 index 00000000000000..d6d0aff40e82e4 --- /dev/null +++ b/be/src/vec/sink/writer/paimon/vpaimon_table_writer.cpp @@ -0,0 +1,417 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vpaimon_table_writer.h" + +#include +#include +#include + +#include "runtime/runtime_state.h" +#include "util/uid_util.h" +#include "vec/columns/column_nullable.h" +#include "vec/core/block.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/core/materialize_block.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/runtime/vdatetime_value.h" +#include "vec/sink/writer/paimon/vpaimon_partition_writer.h" +#include "vec/sink/writer/vhive_utils.h" + +namespace doris { +namespace vectorized { +#include "common/compile_check_begin.h" + +VPaimonTableWriter::VPaimonTableWriter(const TDataSink& t_sink, + const VExprContextSPtrs& output_expr_ctxs, + std::shared_ptr dep, + std::shared_ptr fin_dep) + : AsyncResultWriter(output_expr_ctxs, dep, fin_dep), _t_sink(t_sink) { + DCHECK(_t_sink.__isset.paimon_table_sink); +} + +Status VPaimonTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { + _state = state; + + _written_rows_counter = ADD_COUNTER(_operator_profile, "WrittenRows", TUnit::UNIT); + _send_data_timer = ADD_TIMER(_operator_profile, "SendDataTime"); + _partition_writers_dispatch_timer = + ADD_CHILD_TIMER(_operator_profile, "PartitionsDispatchTime", "SendDataTime"); + _partition_writers_write_timer = + ADD_CHILD_TIMER(_operator_profile, "PartitionsWriteTime", "SendDataTime"); + _partition_writers_count = ADD_COUNTER(_operator_profile, "PartitionsWriteCount", TUnit::UNIT); + _open_timer = ADD_TIMER(_operator_profile, "OpenTime"); + _close_timer = ADD_TIMER(_operator_profile, "CloseTime"); + _write_file_counter = ADD_COUNTER(_operator_profile, "WriteFileCount", TUnit::UNIT); + + SCOPED_TIMER(_open_timer); + + auto& paimon_sink = _t_sink.paimon_table_sink; + // Identify partition vs. regular columns using the columns descriptor + for (int i = 0; i < (int)paimon_sink.columns.size(); ++i) { + switch (paimon_sink.columns[i].column_type) { + case THiveColumnType::PARTITION_KEY: + _partition_columns_input_index.emplace_back(i); + _non_write_columns_indices.insert(i); + break; + case THiveColumnType::REGULAR: + _write_output_vexpr_ctxs.push_back(_vec_output_expr_ctxs[i]); + break; + default: + _non_write_columns_indices.insert(i); + break; + } + } + return Status::OK(); +} + +Status VPaimonTableWriter::write(RuntimeState* state, vectorized::Block& block) { + SCOPED_RAW_TIMER(&_send_data_ns); + if (block.rows() == 0) { + return Status::OK(); + } + + Block output_block; + RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( + _vec_output_expr_ctxs, block, &output_block, false)); + materialize_block_inplace(output_block); + + std::unordered_map, IColumn::Filter> writer_positions; + _row_count += output_block.rows(); + + auto& paimon_sink = _t_sink.paimon_table_sink; + + // Non-partitioned table + if (_partition_columns_input_index.empty()) { + std::shared_ptr writer; + { + SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns); + auto it = _partitions_to_writers.find(""); + if (it == _partitions_to_writers.end()) { + try { + writer = _create_partition_writer(output_block, -1); + } catch (doris::Exception& e) { + return e.to_status(); + } + _partitions_to_writers.insert({"", writer}); + RETURN_IF_ERROR(writer->open(_state, _operator_profile)); + } else { + if (it->second->written_len() > config::hive_sink_max_file_size) { + std::string file_name(it->second->file_name()); + int file_name_index = it->second->file_name_index(); + { + SCOPED_RAW_TIMER(&_close_ns); + static_cast(it->second->close(Status::OK())); + } + _partitions_to_writers.erase(it); + try { + writer = _create_partition_writer(output_block, -1, &file_name, + file_name_index + 1); + } catch (doris::Exception& e) { + return e.to_status(); + } + _partitions_to_writers.insert({"", writer}); + RETURN_IF_ERROR(writer->open(_state, _operator_profile)); + } else { + writer = it->second; + } + } + } + SCOPED_RAW_TIMER(&_partition_writers_write_ns); + output_block.erase(_non_write_columns_indices); + RETURN_IF_ERROR(writer->write(output_block)); + return Status::OK(); + } + + // Partitioned table - dispatch each row to the right partition writer + { + SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns); + for (int i = 0; i < (int)output_block.rows(); ++i) { + std::vector partition_values; + try { + partition_values = _create_partition_values(output_block, i); + } catch (doris::Exception& e) { + return e.to_status(); + } + + // Build partition key string: "col1=val1/col2=val2" + std::string partition_name; + for (int j = 0; j < (int)_partition_columns_input_index.size(); ++j) { + if (j > 0) { + partition_name += "/"; + } + const std::string& col_name = + paimon_sink.columns[_partition_columns_input_index[j]].name; + partition_name += + VHiveUtils::escape_path_name(col_name) + "=" + + VHiveUtils::escape_path_name(partition_values[j]); + } + + auto create_and_open_writer = + [&](int position, const std::string* file_name, int file_name_index, + std::shared_ptr& writer_out) -> Status { + try { + auto w = _create_partition_writer(output_block, position, file_name, + file_name_index); + RETURN_IF_ERROR(w->open(_state, _operator_profile)); + IColumn::Filter filter(output_block.rows(), 0); + filter[position] = 1; + writer_positions.insert({w, std::move(filter)}); + _partitions_to_writers.insert({partition_name, w}); + writer_out = w; + } catch (doris::Exception& e) { + return e.to_status(); + } + return Status::OK(); + }; + + auto writer_it = _partitions_to_writers.find(partition_name); + if (writer_it == _partitions_to_writers.end()) { + if (_partitions_to_writers.size() + 1 > + config::table_sink_partition_write_max_partition_nums_per_writer) { + return Status::InternalError( + "Too many open partitions {}", + config::table_sink_partition_write_max_partition_nums_per_writer); + } + std::shared_ptr w; + RETURN_IF_ERROR(create_and_open_writer(i, nullptr, 0, w)); + } else { + std::shared_ptr w; + if (writer_it->second->written_len() > config::hive_sink_max_file_size) { + std::string file_name(writer_it->second->file_name()); + int file_name_index = writer_it->second->file_name_index(); + { + SCOPED_RAW_TIMER(&_close_ns); + static_cast(writer_it->second->close(Status::OK())); + } + writer_positions.erase(writer_it->second); + _partitions_to_writers.erase(writer_it); + RETURN_IF_ERROR( + create_and_open_writer(i, &file_name, file_name_index + 1, w)); + } else { + w = writer_it->second; + auto pos_it = writer_positions.find(w); + if (pos_it == writer_positions.end()) { + IColumn::Filter filter(output_block.rows(), 0); + filter[i] = 1; + writer_positions.insert({w, std::move(filter)}); + } else { + pos_it->second[i] = 1; + } + } + } + } + } + + SCOPED_RAW_TIMER(&_partition_writers_write_ns); + output_block.erase(_non_write_columns_indices); + for (auto& [writer, filter] : writer_positions) { + Block filtered_block; + RETURN_IF_ERROR(_filter_block(output_block, &filter, &filtered_block)); + RETURN_IF_ERROR(writer->write(filtered_block)); + } + return Status::OK(); +} + +Status VPaimonTableWriter::close(Status status) { + Status result_status; + int64_t partitions_count = _partitions_to_writers.size(); + LOG(INFO) << "VPaimonTableWriter::close - called with status.ok()=" << status.ok() + << ", partitions_count=" << partitions_count << ", row_count=" << _row_count; + { + SCOPED_RAW_TIMER(&_close_ns); + for (const auto& [name, writer] : _partitions_to_writers) { + LOG(INFO) << "Closing partition writer for: " << name; + Status st = writer->close(status); + if (!st.ok()) { + LOG(WARNING) << "paimon partition writer close failed for " << name << ": " + << st.to_string(); + if (result_status.ok()) { + result_status = st; + } + } + } + _partitions_to_writers.clear(); + } + if (status.ok()) { + SCOPED_TIMER(_operator_profile->total_time_counter()); + COUNTER_SET(_written_rows_counter, static_cast(_row_count)); + COUNTER_SET(_send_data_timer, _send_data_ns); + COUNTER_SET(_partition_writers_dispatch_timer, _partition_writers_dispatch_ns); + COUNTER_SET(_partition_writers_write_timer, _partition_writers_write_ns); + COUNTER_SET(_partition_writers_count, partitions_count); + COUNTER_SET(_close_timer, _close_ns); + COUNTER_SET(_write_file_counter, _write_file_count); + } + return result_status; +} + +Status VPaimonTableWriter::_filter_block(doris::vectorized::Block& block, + const vectorized::IColumn::Filter* filter, + doris::vectorized::Block* output_block) { + const ColumnsWithTypeAndName& cols = block.get_columns_with_type_and_name(); + vectorized::ColumnsWithTypeAndName result; + for (const auto& col : cols) { + result.emplace_back(col.column->clone_resized(col.column->size()), col.type, col.name); + } + *output_block = {std::move(result)}; + + std::vector columns_to_filter(output_block->columns()); + for (uint32_t i = 0; i < output_block->columns(); ++i) { + columns_to_filter[i] = i; + } + Block::filter_block_internal(output_block, columns_to_filter, *filter); + return Status::OK(); +} + +std::shared_ptr VPaimonTableWriter::_create_partition_writer( + vectorized::Block& block, int position, const std::string* file_name, int file_name_index) { + auto& paimon_sink = _t_sink.paimon_table_sink; + + std::string partition_path; + std::vector partition_values; + + if (!_partition_columns_input_index.empty() && position >= 0) { + partition_values = _create_partition_values(block, position); + for (int j = 0; j < (int)_partition_columns_input_index.size(); ++j) { + if (!partition_path.empty()) { + partition_path += "/"; + } + const std::string& col_name = + paimon_sink.columns[_partition_columns_input_index[j]].name; + partition_path += VHiveUtils::escape_path_name(col_name) + "=" + + VHiveUtils::escape_path_name(partition_values[j]); + } + } + + std::string write_path = partition_path.empty() + ? paimon_sink.output_path + : fmt::format("{}/{}", paimon_sink.output_path, partition_path); + std::string original_write_path = write_path; + + VPaimonPartitionWriter::WriteInfo write_info = { + .write_path = write_path, + .original_write_path = original_write_path, + .file_type = paimon_sink.file_type, + .broker_addresses = {}}; + if (paimon_sink.__isset.broker_addresses) { + write_info.broker_addresses.assign(paimon_sink.broker_addresses.begin(), + paimon_sink.broker_addresses.end()); + } + + std::vector column_names; + for (int i = 0; i < (int)paimon_sink.columns.size(); ++i) { + if (!_non_write_columns_indices.count(i)) { + column_names.emplace_back(paimon_sink.columns[i].name); + } + } + + _write_file_count++; + return std::make_shared( + partition_values, _write_output_vexpr_ctxs, column_names, + std::move(write_info), file_name ? *file_name : _compute_file_name(), + file_name_index, paimon_sink.file_format, paimon_sink.compression_type, + paimon_sink.hadoop_config); +} + +std::vector VPaimonTableWriter::_create_partition_values(vectorized::Block& block, + int position) { + std::vector partition_values; + for (int idx : _partition_columns_input_index) { + vectorized::ColumnWithTypeAndName col = block.get_by_position(idx); + std::string value = _to_partition_value( + _vec_output_expr_ctxs[idx]->root()->data_type(), col, position); + partition_values.emplace_back(value); + } + return partition_values; +} + +std::string VPaimonTableWriter::_to_partition_value(const DataTypePtr& type, + const ColumnWithTypeAndName& col, + int position) { + ColumnPtr column; + if (auto* nullable = check_and_get_column(*col.column)) { + if (nullable->get_null_map_data()[position]) { + return "__PAIMON_DEFAULT_PARTITION__"; + } + column = nullable->get_nested_column_ptr(); + } else { + column = col.column; + } + + auto [item, size] = column->get_data_at(position); + switch (type->get_primitive_type()) { + case TYPE_BOOLEAN: { + auto val = check_and_get_column(*column)->get_data()[position]; + return val ? "true" : "false"; + } + case TYPE_TINYINT: + return std::to_string(*reinterpret_cast(item)); + case TYPE_SMALLINT: + return std::to_string(*reinterpret_cast(item)); + case TYPE_INT: + return std::to_string(*reinterpret_cast(item)); + case TYPE_BIGINT: + return std::to_string(*reinterpret_cast(item)); + case TYPE_FLOAT: + return std::to_string(*reinterpret_cast(item)); + case TYPE_DOUBLE: + return std::to_string(*reinterpret_cast(item)); + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_STRING: + return std::string(item, size); + case TYPE_DATE: { + VecDateTimeValue value = binary_cast(*(int64_t*)item); + char buf[64]; + char* pos = value.to_string(buf); + return std::string(buf, pos - buf - 1); + } + case TYPE_DATETIME: { + VecDateTimeValue value = binary_cast(*(int64_t*)item); + char buf[64]; + char* pos = value.to_string(buf); + return std::string(buf, pos - buf - 1); + } + case TYPE_DATEV2: { + DateV2Value value = + binary_cast>(*(int32_t*)item); + char buf[64]; + char* pos = value.to_string(buf); + return std::string(buf, pos - buf - 1); + } + case TYPE_DATETIMEV2: { + DateV2Value value = + binary_cast>(*(int64_t*)item); + char buf[64]; + char* pos = value.to_string(buf); + return std::string(buf, pos - buf - 1); + } + default: + return std::string(item, size); + } +} + +std::string VPaimonTableWriter::_compute_file_name() { + boost::uuids::uuid uuid = boost::uuids::random_generator()(); + return fmt::format("{}_{}", print_id(_state->query_id()), boost::uuids::to_string(uuid)); +} + +#include "common/compile_check_end.h" +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/sink/writer/paimon/vpaimon_table_writer.h b/be/src/vec/sink/writer/paimon/vpaimon_table_writer.h new file mode 100644 index 00000000000000..5853257b1b0bac --- /dev/null +++ b/be/src/vec/sink/writer/paimon/vpaimon_table_writer.h @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "util/runtime_profile.h" +#include "vec/columns/column.h" +#include "vec/data_types/data_type.h" +#include "vec/exprs/vexpr_fwd.h" +#include "vec/sink/writer/async_result_writer.h" + +namespace doris { + +class ObjectPool; +class RuntimeState; +class RuntimeProfile; + +namespace vectorized { + +class Block; +class VPaimonPartitionWriter; +struct ColumnWithTypeAndName; + +class VPaimonTableWriter final : public AsyncResultWriter { +public: + VPaimonTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, + std::shared_ptr dep, + std::shared_ptr fin_dep); + + ~VPaimonTableWriter() override = default; + + Status init_properties(ObjectPool* pool) { return Status::OK(); } + + Status open(RuntimeState* state, RuntimeProfile* profile) override; + + Status write(RuntimeState* state, vectorized::Block& block) override; + + Status close(Status) override; + +private: + std::shared_ptr _create_partition_writer( + vectorized::Block& block, int position, const std::string* file_name = nullptr, + int file_name_index = 0); + + std::vector _create_partition_values(vectorized::Block& block, int position); + + std::string _to_partition_value(const DataTypePtr& type, + const ColumnWithTypeAndName& col, int position); + + std::string _compute_file_name(); + + Status _filter_block(doris::vectorized::Block& block, const vectorized::IColumn::Filter* filter, + doris::vectorized::Block* output_block); + + TDataSink _t_sink; + RuntimeState* _state = nullptr; + + // Indices into the output block for partition columns + std::vector _partition_columns_input_index; + // Column indices that should NOT be written to the data file (partition cols) + std::set _non_write_columns_indices; + + std::unordered_map> _partitions_to_writers; + VExprContextSPtrs _write_output_vexpr_ctxs; + + size_t _row_count = 0; + + // profile counters + int64_t _send_data_ns = 0; + int64_t _partition_writers_dispatch_ns = 0; + int64_t _partition_writers_write_ns = 0; + int64_t _close_ns = 0; + int64_t _write_file_count = 0; + + RuntimeProfile::Counter* _written_rows_counter = nullptr; + RuntimeProfile::Counter* _send_data_timer = nullptr; + RuntimeProfile::Counter* _partition_writers_dispatch_timer = nullptr; + RuntimeProfile::Counter* _partition_writers_write_timer = nullptr; + RuntimeProfile::Counter* _partition_writers_count = nullptr; + RuntimeProfile::Counter* _open_timer = nullptr; + RuntimeProfile::Counter* _close_timer = nullptr; + RuntimeProfile::Counter* _write_file_counter = nullptr; +}; + +} // namespace vectorized +} // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java index 87f606dfcca451..52517c736300bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java @@ -26,6 +26,7 @@ import org.apache.doris.datasource.metacache.CacheSpec; import org.apache.doris.datasource.operations.ExternalMetadataOperations; import org.apache.doris.datasource.property.metastore.AbstractPaimonProperties; +import org.apache.doris.transaction.TransactionManagerFactory; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.logging.log4j.LogManager; @@ -68,6 +69,13 @@ protected void initLocalObjectsImpl() { catalog = createCatalog(); initPreExecutionAuthenticator(); metadataOps = ExternalMetadataOperations.newPaimonMetaOps(this, catalog); + transactionManager = TransactionManagerFactory.createPaimonTransactionManager( + (PaimonMetadataOps) metadataOps); + } + + public Catalog getPaimonCatalog() { + makeSureInitialized(); + return catalog; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonTransaction.java new file mode 100644 index 00000000000000..67de27af6f0c3e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonTransaction.java @@ -0,0 +1,233 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.common.UserException; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext; +import org.apache.doris.nereids.trees.plans.commands.insert.PaimonInsertCommandContext; +import org.apache.doris.thrift.TPaimonCommitData; +import org.apache.doris.transaction.Transaction; + +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.DateTimeUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +/** + * Paimon transaction for direct-write insert operations. + * BE writes ORC/Parquet files directly to table storage; FE commits via Paimon Java API + * using file metadata (path, row count, size, partition values) collected from BEs. + */ +public class PaimonTransaction implements Transaction { + + private static final Logger LOG = LogManager.getLogger(PaimonTransaction.class); + + private final PaimonMetadataOps ops; + private Table table; + private BatchWriteBuilder writeBuilder; + private BatchTableCommit committer; + private final List commitDataList = Lists.newArrayList(); + private PaimonInsertCommandContext insertCtx; + + public PaimonTransaction(PaimonMetadataOps ops) { + this.ops = ops; + } + + public void updatePaimonCommitData(List commitDataList) { + synchronized (this) { + this.commitDataList.addAll(commitDataList); + } + } + + public void beginInsert(PaimonExternalTable dorisTable, Optional ctx) + throws UserException { + ctx.ifPresent(c -> this.insertCtx = (PaimonInsertCommandContext) c); + try { + Catalog paimonCatalog = ((PaimonExternalCatalog) dorisTable.getCatalog()).getPaimonCatalog(); + Identifier identifier = Identifier.create(dorisTable.getDbName(), dorisTable.getName()); + this.table = paimonCatalog.getTable(identifier); + + this.writeBuilder = table.newBatchWriteBuilder(); + if (insertCtx != null && insertCtx.isOverwrite()) { + this.writeBuilder = this.writeBuilder.withOverwrite(); + } + this.committer = writeBuilder.newCommit(); + LOG.info("Paimon insert transaction started for table: {}", dorisTable.getName()); + } catch (Exception e) { + throw new UserException("Failed to begin insert for paimon table " + dorisTable.getName() + + " because: " + e.getMessage(), e); + } + } + + public void finishInsert() { + LOG.debug("Paimon insert finished with {} commit data entries", commitDataList.size()); + } + + @Override + public void commit() throws UserException { + try { + List commitMessages = buildCommitMessages(); + committer.commit(commitMessages); + LOG.info("Paimon transaction committed with {} files", commitMessages.size()); + } catch (Exception e) { + throw new UserException("Failed to commit paimon transaction: " + e.getMessage(), e); + } finally { + closeResources(); + } + } + + @Override + public void rollback() { + LOG.info("Paimon transaction rolled back"); + closeResources(); + } + + public long getUpdateCnt() { + return commitDataList.stream().mapToLong(TPaimonCommitData::getRowCount).sum(); + } + + /** + * Build Paimon CommitMessage objects from file metadata collected from BE nodes. + * Each TPaimonCommitData corresponds to one ORC/Parquet file written by a BE. + */ + private List buildCommitMessages() throws Exception { + if (commitDataList.isEmpty()) { + return Collections.emptyList(); + } + + FileStoreTable fileStoreTable = (FileStoreTable) table; + long schemaId = fileStoreTable.schema().id(); + List partitionKeys = table.partitionKeys(); + RowType partitionType = table.rowType().project(partitionKeys); + + List messages = new ArrayList<>(); + for (TPaimonCommitData data : commitDataList) { + // Extract just the file name from the full path + String filePath = data.getFilePath(); + String fileName = filePath.substring(filePath.lastIndexOf('/') + 1); + + DataFileMeta fileMeta = DataFileMeta.forAppend( + fileName, + data.getFileSize(), + data.getRowCount(), + SimpleStats.EMPTY_STATS, + 0L, // minSequenceNumber + 0L, // maxSequenceNumber + schemaId, + Collections.emptyList(), + null, // embeddedIndex + FileSource.APPEND, + null, // valueStatsCols + null); // externalPath + + List partitionValues = data.isSetPartitionValues() + ? data.getPartitionValues() : Collections.emptyList(); + BinaryRow partition = createPartitionBinaryRow(partitionValues, partitionType); + + CommitMessage message = new CommitMessageImpl( + partition, + 0, // bucket-0 + null, // totalBuckets + new DataIncrement( + Collections.singletonList(fileMeta), + Collections.emptyList(), + Collections.emptyList()), + CompactIncrement.emptyIncrement()); + + messages.add(message); + } + return messages; + } + + private BinaryRow createPartitionBinaryRow(List partitionValues, RowType partitionType) { + if (partitionValues.isEmpty()) { + return BinaryRow.EMPTY_ROW; + } + GenericRow row = new GenericRow(partitionValues.size()); + for (int i = 0; i < partitionValues.size(); i++) { + DataType fieldType = partitionType.getTypeAt(i); + row.setField(i, convertPartitionValue(partitionValues.get(i), fieldType)); + } + InternalRowSerializer serializer = new InternalRowSerializer(partitionType); + return serializer.toBinaryRow(row).copy(); + } + + private Object convertPartitionValue(String value, DataType type) { + DataTypeRoot root = type.getTypeRoot(); + switch (root) { + case VARCHAR: + case CHAR: + return BinaryString.fromString(value); + case BOOLEAN: + return Boolean.parseBoolean(value); + case TINYINT: + return Byte.parseByte(value); + case SMALLINT: + return Short.parseShort(value); + case INTEGER: + return Integer.parseInt(value); + case BIGINT: + return Long.parseLong(value); + case FLOAT: + return Float.parseFloat(value); + case DOUBLE: + return Double.parseDouble(value); + case DATE: + // Paimon stores dates as int (days since epoch) + return DateTimeUtils.parseDate(value); + default: + // Fallback to string representation + return BinaryString.fromString(value); + } + } + + private void closeResources() { + try { + if (committer != null) { + committer.close(); + } + } catch (Exception e) { + LOG.warn("Failed to close paimon committer", e); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundPaimonTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundPaimonTableSink.java new file mode 100644 index 00000000000000..86a5a75655d9fa --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundPaimonTableSink.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.analyzer; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Optional; + +/** + * Represent a paimon table sink plan node that has not been bound. + */ +public class UnboundPaimonTableSink extends UnboundBaseExternalTableSink { + + public UnboundPaimonTableSink(List nameParts, List colNames, List hints, + List partitions, CHILD_TYPE child) { + this(nameParts, colNames, hints, partitions, DMLCommandType.NONE, + Optional.empty(), Optional.empty(), child); + } + + /** + * constructor + */ + public UnboundPaimonTableSink(List nameParts, + List colNames, + List hints, + List partitions, + DMLCommandType dmlCommandType, + Optional groupExpression, + Optional logicalProperties, + CHILD_TYPE child) { + super(nameParts, PlanType.LOGICAL_UNBOUND_PAIMON_TABLE_SINK, ImmutableList.of(), groupExpression, + logicalProperties, colNames, dmlCommandType, child, hints, partitions); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitUnboundPaimonTableSink(this, context); + } + + @Override + public Plan withChildren(List children) { + Preconditions.checkArgument(children.size() == 1, + "UnboundPaimonTableSink only accepts one child"); + return new UnboundPaimonTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, Optional.empty(), children.get(0)); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new UnboundPaimonTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new UnboundPaimonTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, logicalProperties, children.get(0)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java index e16902cd76c3f3..b7a8439b35b981 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java @@ -26,6 +26,7 @@ import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.jdbc.JdbcExternalCatalog; import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog; +import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.doris.dictionary.Dictionary; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.exceptions.ParseException; @@ -65,6 +66,8 @@ public static LogicalSink createUnboundTableSink(List na return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions, query); } else if (curCatalog instanceof MaxComputeExternalCatalog) { return new UnboundMaxComputeTableSink<>(nameParts, colNames, hints, partitions, query); + } else if (curCatalog instanceof PaimonExternalCatalog) { + return new UnboundPaimonTableSink<>(nameParts, colNames, hints, partitions, query); } throw new UserException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported."); } @@ -106,6 +109,9 @@ public static LogicalSink createUnboundTableSink(List na } else if (curCatalog instanceof MaxComputeExternalCatalog) { return new UnboundMaxComputeTableSink<>(nameParts, colNames, hints, partitions, dmlCommandType, Optional.empty(), Optional.empty(), plan, staticPartitionKeyValues); + } else if (curCatalog instanceof PaimonExternalCatalog) { + return new UnboundPaimonTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), plan); } throw new RuntimeException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported."); } @@ -147,6 +153,9 @@ public static LogicalSink createUnboundTableSinkMaybeOverwrite(L } else if (curCatalog instanceof MaxComputeExternalCatalog && !isAutoDetectPartition) { return new UnboundMaxComputeTableSink<>(nameParts, colNames, hints, partitions, dmlCommandType, Optional.empty(), Optional.empty(), plan, staticPartitionKeyValues); + } else if (curCatalog instanceof PaimonExternalCatalog && !isAutoDetectPartition) { + return new UnboundPaimonTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), plan); } throw new AnalysisException( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 892441752c572c..a1631253df0add 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -64,6 +64,7 @@ import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable; import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode; import org.apache.doris.datasource.odbc.source.OdbcScanNode; +import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.datasource.paimon.source.PaimonScanNode; import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable; import org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode; @@ -153,6 +154,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPaimonTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; @@ -209,6 +211,7 @@ import org.apache.doris.planner.NestedLoopJoinNode; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.OlapTableSink; +import org.apache.doris.planner.PaimonTableSink; import org.apache.doris.planner.PartitionSortNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanNode; @@ -597,6 +600,20 @@ public PlanFragment visitPhysicalIcebergTableSink(PhysicalIcebergTableSink paimonTableSink, + PlanTranslatorContext context) { + PlanFragment rootFragment = paimonTableSink.child().accept(this, context); + rootFragment.setOutputPartition(DataPartition.UNPARTITIONED); + List outputExprs = Lists.newArrayList(); + paimonTableSink.getOutput().stream().map(Slot::getExprId) + .forEach(exprId -> outputExprs.add(context.findSlotRef(exprId))); + PaimonTableSink sink = new PaimonTableSink((PaimonExternalTable) paimonTableSink.getTargetTable()); + rootFragment.setSink(sink); + rootFragment.setOutputExprs(outputExprs); + return rootFragment; + } + @Override public PlanFragment visitPhysicalMaxComputeTableSink(PhysicalMaxComputeTableSink mcTableSink, PlanTranslatorContext context) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java index d678f6259b316f..41d7f0c233b90d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java @@ -88,6 +88,7 @@ import org.apache.doris.nereids.rules.implementation.LogicalOlapScanToPhysicalOlapScan; import org.apache.doris.nereids.rules.implementation.LogicalOlapTableSinkToPhysicalOlapTableSink; import org.apache.doris.nereids.rules.implementation.LogicalOneRowRelationToPhysicalOneRowRelation; +import org.apache.doris.nereids.rules.implementation.LogicalPaimonTableSinkToPhysicalPaimonTableSink; import org.apache.doris.nereids.rules.implementation.LogicalPartitionTopNToPhysicalPartitionTopN; import org.apache.doris.nereids.rules.implementation.LogicalProjectToPhysicalProject; import org.apache.doris.nereids.rules.implementation.LogicalRecursiveUnionAnchorToPhysicalRecursiveUnionAnchor; @@ -237,6 +238,7 @@ public class RuleSet { .add(new LogicalOlapTableSinkToPhysicalOlapTableSink()) .add(new LogicalHiveTableSinkToPhysicalHiveTableSink()) .add(new LogicalIcebergTableSinkToPhysicalIcebergTableSink()) + .add(new LogicalPaimonTableSinkToPhysicalPaimonTableSink()) .add(new LogicalMaxComputeTableSinkToPhysicalMaxComputeTableSink()) .add(new LogicalJdbcTableSinkToPhysicalJdbcTableSink()) .add(new LogicalFileSinkToPhysicalFileSink()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 131ce0602c2dea..af5cda3c1dfc99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -38,6 +38,7 @@ public enum RuleType { BINDING_INSERT_BLACKHOLE_SINK(RuleTypeClass.REWRITE), BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_ICEBERG_TABLE(RuleTypeClass.REWRITE), + BINDING_INSERT_PAIMON_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_MAX_COMPUTE_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_JDBC_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE), @@ -555,6 +556,7 @@ public enum RuleType { LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_ICEBERG_TABLE_SINK_TO_PHYSICAL_ICEBERG_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), + LOGICAL_PAIMON_TABLE_SINK_TO_PHYSICAL_PAIMON_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_MAX_COMPUTE_TABLE_SINK_TO_PHYSICAL_MAX_COMPUTE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_JDBC_TABLE_SINK_TO_PHYSICAL_JDBC_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index b887180c69f649..6394177f4a40d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -40,6 +40,8 @@ import org.apache.doris.datasource.jdbc.JdbcExternalTable; import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase; import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable; +import org.apache.doris.datasource.paimon.PaimonExternalDatabase; +import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.dictionary.Dictionary; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.StatementContext; @@ -50,6 +52,7 @@ import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink; import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink; import org.apache.doris.nereids.analyzer.UnboundMaxComputeTableSink; +import org.apache.doris.nereids.analyzer.UnboundPaimonTableSink; import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundTVFTableSink; import org.apache.doris.nereids.analyzer.UnboundTableSink; @@ -86,6 +89,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalPaimonTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.logical.LogicalTVFTableSink; @@ -162,6 +166,8 @@ public List buildRules() { RuleType.BINDING_INSERT_HIVE_TABLE.build(unboundHiveTableSink().thenApply(this::bindHiveTableSink)), RuleType.BINDING_INSERT_ICEBERG_TABLE.build( unboundIcebergTableSink().thenApply(this::bindIcebergTableSink)), + RuleType.BINDING_INSERT_PAIMON_TABLE.build( + unboundPaimonTableSink().thenApply(this::bindPaimonTableSink)), RuleType.BINDING_INSERT_MAX_COMPUTE_TABLE.build( unboundMaxComputeTableSink().thenApply(this::bindMaxComputeTableSink)), RuleType.BINDING_INSERT_JDBC_TABLE.build(unboundJdbcTableSink().thenApply(this::bindJdbcTableSink)), @@ -823,6 +829,48 @@ private void validateStaticPartition(UnboundIcebergTableSink sink, IcebergExt } } + private Plan bindPaimonTableSink(MatchingContext> ctx) { + UnboundPaimonTableSink sink = ctx.root; + Pair pair = bind(ctx.cascadesContext, sink); + PaimonExternalDatabase database = pair.first; + PaimonExternalTable table = pair.second; + LogicalPlan child = ((LogicalPlan) sink.child()); + + List bindColumns; + if (sink.getColNames().isEmpty()) { + bindColumns = table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList()); + } else { + bindColumns = sink.getColNames().stream().map(cn -> { + Column column = table.getColumn(cn); + if (column == null) { + throw new AnalysisException(String.format("column %s is not found in table %s", + cn, table.getName())); + } + return column; + }).collect(ImmutableList.toImmutableList()); + } + + LogicalPaimonTableSink boundSink = new LogicalPaimonTableSink<>( + database, + table, + bindColumns, + child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()), + sink.getDMLCommandType(), + Optional.empty(), + Optional.empty(), + child); + + if (boundSink.getCols().size() != child.getOutput().size()) { + throw new AnalysisException("insert into cols should be corresponding to the query output"); + } + Map columnToOutput = getColumnToOutput(ctx, table, false, + boundSink, child); + LogicalProject fullOutputProject = getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput); + return boundSink.withChildAndUpdateOutput(fullOutputProject); + } + private Plan bindMaxComputeTableSink(MatchingContext> ctx) { UnboundMaxComputeTableSink sink = ctx.root; Pair pair = bind(ctx.cascadesContext, sink); @@ -1033,6 +1081,18 @@ private Pair bind(CascadesContext throw new AnalysisException("the target table of insert into is not an iceberg table"); } + private Pair bind(CascadesContext cascadesContext, + UnboundPaimonTableSink sink) { + List tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), + sink.getNameParts()); + Pair, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier, + cascadesContext.getConnectContext().getEnv(), Optional.empty()); + if (pair.second instanceof PaimonExternalTable) { + return Pair.of(((PaimonExternalDatabase) pair.first), (PaimonExternalTable) pair.second); + } + throw new AnalysisException("the target table of insert into is not a Paimon table"); + } + private Pair bind(CascadesContext cascadesContext, UnboundMaxComputeTableSink sink) { List tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionRewrite.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionRewrite.java index f9aaf46d5afcfd..587e622abb2de3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionRewrite.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionRewrite.java @@ -111,6 +111,7 @@ public List buildRules() { new LogicalHiveTableSinkRewrite().build(), new LogicalIcebergTableSinkRewrite().build(), new LogicalMaxComputeTableSinkRewrite().build(), + new LogicalPaimonTableSinkRewrite().build(), new LogicalJdbcTableSinkRewrite().build(), new LogicalOlapTableSinkRewrite().build(), new LogicalDictionarySinkRewrite().build(), @@ -527,6 +528,14 @@ public Rule build() { } } + private class LogicalPaimonTableSinkRewrite extends OneRewriteRuleFactory { + @Override + public Rule build() { + return logicalPaimonTableSink().thenApply(ExpressionRewrite.this::applyRewriteToSink) + .toRule(RuleType.REWRITE_SINK_EXPRESSION); + } + } + private class LogicalJdbcTableSinkRewrite extends OneRewriteRuleFactory { @Override public Rule build() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPaimonTableSinkToPhysicalPaimonTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPaimonTableSinkToPhysicalPaimonTableSink.java new file mode 100644 index 00000000000000..b1c03c2d5be27b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPaimonTableSinkToPhysicalPaimonTableSink.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.implementation; + +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPaimonTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPaimonTableSink; + +import java.util.Optional; + +/** + * Implementation rule that convert logical PaimonTableSink to physical PaimonTableSink. + */ +public class LogicalPaimonTableSinkToPhysicalPaimonTableSink extends OneImplementationRuleFactory { + @Override + public Rule build() { + return logicalPaimonTableSink().thenApply(ctx -> { + LogicalPaimonTableSink sink = ctx.root; + return new PhysicalPaimonTableSink<>( + sink.getDatabase(), + sink.getTargetTable(), + sink.getCols(), + sink.getOutputExprs(), + Optional.empty(), + sink.getLogicalProperties(), + null, + null, + sink.child()); + }).toRule(RuleType.LOGICAL_PAIMON_TABLE_SINK_TO_PHYSICAL_PAIMON_TABLE_SINK_RULE); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index dcff41a31751cb..77b4b801177e21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -50,6 +50,7 @@ public enum PlanType { LOGICAL_OLAP_TABLE_SINK, LOGICAL_HIVE_TABLE_SINK, LOGICAL_ICEBERG_TABLE_SINK, + LOGICAL_PAIMON_TABLE_SINK, LOGICAL_MAX_COMPUTE_TABLE_SINK, LOGICAL_JDBC_TABLE_SINK, LOGICAL_RESULT_SINK, @@ -58,6 +59,7 @@ public enum PlanType { LOGICAL_UNBOUND_OLAP_TABLE_SINK, LOGICAL_UNBOUND_HIVE_TABLE_SINK, LOGICAL_UNBOUND_ICEBERG_TABLE_SINK, + LOGICAL_UNBOUND_PAIMON_TABLE_SINK, LOGICAL_UNBOUND_MAX_COMPUTE_TABLE_SINK, LOGICAL_UNBOUND_JDBC_TABLE_SINK, LOGICAL_UNBOUND_RESULT_SINK, @@ -121,6 +123,7 @@ public enum PlanType { PHYSICAL_OLAP_TABLE_SINK, PHYSICAL_HIVE_TABLE_SINK, PHYSICAL_ICEBERG_TABLE_SINK, + PHYSICAL_PAIMON_TABLE_SINK, PHYSICAL_MAX_COMPUTE_TABLE_SINK, PHYSICAL_JDBC_TABLE_SINK, PHYSICAL_RESULT_SINK, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java index 8aeb56c7246b68..cf48a662a8a3b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java @@ -176,6 +176,7 @@ protected final void execImpl(StmtExecutor executor) throws Exception { coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict()); coordinator.setQueryType(TQueryType.LOAD); coordinator.setIsProfileSafeStmt(executor.isProfileSafeStmt()); + coordinator.setTxnId(txnId); executor.getProfile().addExecutionProfile(coordinator.getExecutionProfile()); QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), executor.getOriginStmtInString(), coordinator); QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index ff1999572057bd..95d65d52cb10d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -36,6 +36,7 @@ import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.jdbc.JdbcExternalTable; import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable; +import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.dictionary.Dictionary; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.load.loadv2.LoadStatistic; @@ -73,6 +74,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalMaxComputeTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPaimonTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -494,6 +496,21 @@ ExecutorFactory selectInsertExecutorFactory( emptyInsert, jobId ) ); + } else if (physicalSink instanceof PhysicalPaimonTableSink) { + boolean emptyInsert = childIsEmptyRelation(physicalSink); + PaimonExternalTable paimonExternalTable = (PaimonExternalTable) targetTableIf; + PaimonInsertCommandContext paimonInsertCtx = insertCtx + .map(insertCommandContext -> (PaimonInsertCommandContext) insertCommandContext) + .orElseGet(PaimonInsertCommandContext::new); + return ExecutorFactory.from( + planner, + dataSink, + physicalSink, + () -> new PaimonInsertExecutor(ctx, paimonExternalTable, label, planner, + Optional.of(paimonInsertCtx), + emptyInsert, jobId + ) + ); } else if (physicalSink instanceof PhysicalMaxComputeTableSink) { boolean emptyInsert = childIsEmptyRelation(physicalSink); MaxComputeExternalTable mcExternalTable = (MaxComputeExternalTable) targetTableIf; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index b5561f0d77e2c1..db0137b7a0330e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -41,6 +41,7 @@ import org.apache.doris.nereids.analyzer.UnboundInlineTable; import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink; import org.apache.doris.nereids.analyzer.UnboundMaxComputeTableSink; +import org.apache.doris.nereids.analyzer.UnboundPaimonTableSink; import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundStar; import org.apache.doris.nereids.analyzer.UnboundTableSink; @@ -611,6 +612,8 @@ public static List getTargetTableQualified(Plan plan, ConnectContext ctx unboundTableSink = (UnboundBlackholeSink) plan; } else if (plan instanceof UnboundMaxComputeTableSink) { unboundTableSink = (UnboundMaxComputeTableSink) plan; + } else if (plan instanceof UnboundPaimonTableSink) { + unboundTableSink = (UnboundPaimonTableSink) plan; } else { throw new AnalysisException( "the root of plan only accept Olap, Dictionary, Hive, Iceberg or Jdbc table sink, but it is " diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/PaimonInsertCommandContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/PaimonInsertCommandContext.java new file mode 100644 index 00000000000000..158c1a10f04732 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/PaimonInsertCommandContext.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +/** + * For Paimon External Table + */ +public class PaimonInsertCommandContext extends BaseExternalTableInsertCommandContext { +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/PaimonInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/PaimonInsertExecutor.java new file mode 100644 index 00000000000000..086563d29ccf8d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/PaimonInsertExecutor.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.paimon.PaimonExternalTable; +import org.apache.doris.datasource.paimon.PaimonTransaction; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.transaction.TransactionType; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Optional; + +/** + * Insert executor for paimon table + */ +public class PaimonInsertExecutor extends BaseExternalTableInsertExecutor { + private static final Logger LOG = LogManager.getLogger(PaimonInsertExecutor.class); + + /** + * constructor + */ + public PaimonInsertExecutor(ConnectContext ctx, PaimonExternalTable table, + String labelName, NereidsPlanner planner, + Optional insertCtx, + boolean emptyInsert, long jobId) { + super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId); + } + + @Override + protected void beforeExec() throws UserException { + PaimonTransaction transaction = (PaimonTransaction) transactionManager.getTransaction(txnId); + transaction.beginInsert((PaimonExternalTable) table, insertCtx); + } + + @Override + protected void doBeforeCommit() throws UserException { + PaimonTransaction transaction = (PaimonTransaction) transactionManager.getTransaction(txnId); + this.loadedRows = transaction.getUpdateCnt(); + transaction.finishInsert(); + } + + @Override + protected TransactionType transactionType() { + return TransactionType.PAIMON; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalPaimonTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalPaimonTableSink.java new file mode 100644 index 00000000000000..2bc6e7cade27a5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalPaimonTableSink.java @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.paimon.PaimonExternalDatabase; +import org.apache.doris.datasource.paimon.PaimonExternalTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.PropagateFuncDeps; +import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * logical paimon table sink for insert command + */ +public class LogicalPaimonTableSink extends LogicalTableSink + implements Sink, PropagateFuncDeps { + private final PaimonExternalDatabase database; + private final PaimonExternalTable targetTable; + private final DMLCommandType dmlCommandType; + + /** + * constructor + */ + public LogicalPaimonTableSink(PaimonExternalDatabase database, + PaimonExternalTable targetTable, + List cols, + List outputExprs, + DMLCommandType dmlCommandType, + Optional groupExpression, + Optional logicalProperties, + CHILD_TYPE child) { + super(PlanType.LOGICAL_PAIMON_TABLE_SINK, outputExprs, groupExpression, logicalProperties, cols, child); + this.database = Objects.requireNonNull(database, "database != null in LogicalPaimonTableSink"); + this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalPaimonTableSink"); + this.dmlCommandType = dmlCommandType; + } + + public Plan withChildAndUpdateOutput(Plan child) { + List output = child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()); + return new LogicalPaimonTableSink<>(database, targetTable, cols, output, + dmlCommandType, Optional.empty(), Optional.empty(), child); + } + + @Override + public Plan withChildren(List children) { + Preconditions.checkArgument(children.size() == 1, "LogicalPaimonTableSink only accepts one child"); + return new LogicalPaimonTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, Optional.empty(), Optional.empty(), children.get(0)); + } + + public LogicalPaimonTableSink withOutputExprs(List outputExprs) { + return new LogicalPaimonTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, Optional.empty(), Optional.empty(), child()); + } + + public PaimonExternalDatabase getDatabase() { + return database; + } + + public PaimonExternalTable getTargetTable() { + return targetTable; + } + + public DMLCommandType getDmlCommandType() { + return dmlCommandType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + LogicalPaimonTableSink that = (LogicalPaimonTableSink) o; + return dmlCommandType == that.dmlCommandType + && Objects.equals(database, that.database) + && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), database, targetTable, cols, dmlCommandType); + } + + @Override + public String toString() { + return Utils.toSqlString("LogicalPaimonTableSink[" + id.asInt() + "]", + "outputExprs", outputExprs, + "database", database.getFullName(), + "targetTable", targetTable.getName(), + "cols", cols, + "dmlCommandType", dmlCommandType + ); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitLogicalPaimonTableSink(this, context); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new LogicalPaimonTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new LogicalPaimonTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, groupExpression, logicalProperties, children.get(0)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPaimonTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPaimonTableSink.java new file mode 100644 index 00000000000000..278b74e0271ab6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPaimonTableSink.java @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.paimon.PaimonExternalDatabase; +import org.apache.doris.datasource.paimon.PaimonExternalTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.statistics.Statistics; + +import java.util.List; +import java.util.Optional; + +/** physical paimon table sink */ +public class PhysicalPaimonTableSink extends PhysicalBaseExternalTableSink { + + /** + * constructor + */ + public PhysicalPaimonTableSink(PaimonExternalDatabase database, + PaimonExternalTable targetTable, + List cols, + List outputExprs, + Optional groupExpression, + LogicalProperties logicalProperties, + CHILD_TYPE child) { + this(database, targetTable, cols, outputExprs, groupExpression, logicalProperties, + PhysicalProperties.GATHER, null, child); + } + + /** + * constructor + */ + public PhysicalPaimonTableSink(PaimonExternalDatabase database, + PaimonExternalTable targetTable, + List cols, + List outputExprs, + Optional groupExpression, + LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, + Statistics statistics, + CHILD_TYPE child) { + super(PlanType.PHYSICAL_PAIMON_TABLE_SINK, database, targetTable, cols, outputExprs, groupExpression, + logicalProperties, physicalProperties, statistics, child); + } + + @Override + public Plan withChildren(List children) { + return new PhysicalPaimonTableSink<>( + (PaimonExternalDatabase) database, (PaimonExternalTable) targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), physicalProperties, statistics, children.get(0)); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitPhysicalPaimonTableSink(this, context); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new PhysicalPaimonTableSink<>( + (PaimonExternalDatabase) database, (PaimonExternalTable) targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new PhysicalPaimonTableSink<>( + (PaimonExternalDatabase) database, (PaimonExternalTable) targetTable, cols, outputExprs, + groupExpression, logicalProperties.get(), children.get(0)); + } + + @Override + public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { + return new PhysicalPaimonTableSink<>( + (PaimonExternalDatabase) database, (PaimonExternalTable) targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + } + + @Override + public PhysicalProperties getRequirePhysicalProperties() { + return PhysicalProperties.SINK_RANDOM_PARTITIONED; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java index 42c2966889581f..0b99fd47447367 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java @@ -23,6 +23,7 @@ import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink; import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink; import org.apache.doris.nereids.analyzer.UnboundMaxComputeTableSink; +import org.apache.doris.nereids.analyzer.UnboundPaimonTableSink; import org.apache.doris.nereids.analyzer.UnboundResultSink; import org.apache.doris.nereids.analyzer.UnboundTVFTableSink; import org.apache.doris.nereids.analyzer.UnboundTableSink; @@ -36,6 +37,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalMaxComputeTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalPaimonTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink; import org.apache.doris.nereids.trees.plans.logical.LogicalSink; import org.apache.doris.nereids.trees.plans.logical.LogicalTVFTableSink; @@ -49,6 +51,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalMaxComputeTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPaimonTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFTableSink; @@ -83,6 +86,10 @@ default R visitUnboundIcebergTableSink(UnboundIcebergTableSink u return visitLogicalSink(unboundTableSink, context); } + default R visitUnboundPaimonTableSink(UnboundPaimonTableSink unboundTableSink, C context) { + return visitLogicalSink(unboundTableSink, context); + } + default R visitUnboundJdbcTableSink(UnboundJdbcTableSink unboundTableSink, C context) { return visitLogicalSink(unboundTableSink, context); } @@ -131,6 +138,10 @@ default R visitLogicalIcebergTableSink(LogicalIcebergTableSink i return visitLogicalTableSink(icebergTableSink, context); } + default R visitLogicalPaimonTableSink(LogicalPaimonTableSink paimonTableSink, C context) { + return visitLogicalTableSink(paimonTableSink, context); + } + default R visitLogicalMaxComputeTableSink(LogicalMaxComputeTableSink mcTableSink, C context) { return visitLogicalTableSink(mcTableSink, context); } @@ -191,6 +202,10 @@ default R visitPhysicalIcebergTableSink(PhysicalIcebergTableSink return visitPhysicalTableSink(icebergTableSink, context); } + default R visitPhysicalPaimonTableSink(PhysicalPaimonTableSink paimonTableSink, C context) { + return visitPhysicalTableSink(paimonTableSink, context); + } + default R visitPhysicalMaxComputeTableSink(PhysicalMaxComputeTableSink mcTableSink, C context) { return visitPhysicalTableSink(mcTableSink, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java index 1c55def772c475..b26e9bda959ac9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java @@ -28,6 +28,7 @@ import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable; import org.apache.doris.datasource.odbc.sink.OdbcTableSink; +import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TExplainLevel; @@ -77,6 +78,8 @@ public static DataSink createDataSink(TableIf table) throws AnalysisException { return new IcebergTableSink((IcebergExternalTable) table); } else if (table instanceof MaxComputeExternalTable) { return new MaxComputeTableSink((MaxComputeExternalTable) table); + } else if (table instanceof PaimonExternalTable) { + return new PaimonTableSink((PaimonExternalTable) table); } else { throw new AnalysisException("Unknown table type " + table.getType()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PaimonTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PaimonTableSink.java new file mode 100644 index 00000000000000..3cb10b7ec91898 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PaimonTableSink.java @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.LocationPath; +import org.apache.doris.datasource.paimon.PaimonExternalCatalog; +import org.apache.doris.datasource.paimon.PaimonExternalTable; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext; +import org.apache.doris.nereids.trees.plans.commands.insert.PaimonInsertCommandContext; +import org.apache.doris.thrift.TDataSink; +import org.apache.doris.thrift.TDataSinkType; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TFileCompressType; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THiveColumn; +import org.apache.doris.thrift.THiveColumnType; +import org.apache.doris.thrift.TPaimonTableSink; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class PaimonTableSink extends BaseExternalTableDataSink { + + private final PaimonExternalTable targetTable; + private static final HashSet supportedTypes = new HashSet() {{ + add(TFileFormatType.FORMAT_ORC); + add(TFileFormatType.FORMAT_PARQUET); + }}; + + public PaimonTableSink(PaimonExternalTable targetTable) { + super(); + this.targetTable = targetTable; + } + + @Override + protected Set supportedFileFormatTypes() { + return supportedTypes; + } + + @Override + public String getExplainString(String prefix, TExplainLevel explainLevel) { + StringBuilder strBuilder = new StringBuilder(); + strBuilder.append(prefix).append("PAIMON TABLE SINK\n"); + if (explainLevel == TExplainLevel.BRIEF) { + return strBuilder.toString(); + } + strBuilder.append(prefix).append("Table: ") + .append(targetTable.getDbName()).append(".").append(targetTable.getName()).append("\n"); + return strBuilder.toString(); + } + + @Override + public void bindDataSink(Optional insertCtx) + throws AnalysisException { + TPaimonTableSink tSink = new TPaimonTableSink(); + + tSink.setDbName(targetTable.getDbName()); + tSink.setTbName(targetTable.getName()); + + // Get paimon table metadata + Table paimonTable = targetTable.getPaimonTable(Optional.empty()); + + // file format - get from table options, default to parquet + String fileFormat = paimonTable.options().getOrDefault( + CoreOptions.FILE_FORMAT.key(), "parquet"); + TFileFormatType formatType = getTFileFormatType(fileFormat.toUpperCase()); + tSink.setFileFormat(formatType); + + // compression type + TFileCompressType compressType = formatType == TFileFormatType.FORMAT_PARQUET + ? TFileCompressType.SNAPPYBLOCK : TFileCompressType.ZLIB; + tSink.setCompressionType(compressType); + + // output path - use paimon table location + String location = ((FileStoreTable) paimonTable).location().toString(); + tSink.setOutputPath(location); + + // file type (S3, HDFS, etc.) + TFileType fileType = LocationPath.getTFileTypeForBE(location); + tSink.setFileType(fileType); + if (fileType == TFileType.FILE_BROKER) { + tSink.setBrokerAddresses(getBrokerAddresses(targetTable.getCatalog().bindBrokerName())); + } + + // hadoop config from catalog properties + PaimonExternalCatalog catalog = (PaimonExternalCatalog) targetTable.getCatalog(); + Map props = new HashMap<>(catalog.getPaimonOptionsMap()); + tSink.setHadoopConfig(props); + + // overwrite flag + if (insertCtx.isPresent()) { + PaimonInsertCommandContext context = (PaimonInsertCommandContext) insertCtx.get(); + tSink.setOverwrite(context.isOverwrite()); + } + + // partition column names + List partitionKeys = paimonTable.partitionKeys(); + if (!partitionKeys.isEmpty()) { + tSink.setPartitionColumns(partitionKeys); + } + + // column descriptors (PARTITION_KEY vs REGULAR) - used by BE to identify partition columns + Set partitionKeySet = new HashSet<>(partitionKeys); + List allColumns = targetTable.getColumns(); + List columns = new ArrayList<>(); + for (Column col : allColumns) { + THiveColumn tCol = new THiveColumn(); + tCol.setName(col.getName()); + tCol.setColumnType(partitionKeySet.contains(col.getName()) + ? THiveColumnType.PARTITION_KEY : THiveColumnType.REGULAR); + columns.add(tCol); + } + tSink.setColumns(columns); + + tDataSink = new TDataSink(TDataSinkType.PAIMON_TABLE_SINK); + tDataSink.setPaimonTableSink(tSink); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index d0f002a3f15fd6..60561560c0d7d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -40,6 +40,7 @@ import org.apache.doris.datasource.hive.HMSTransaction; import org.apache.doris.datasource.iceberg.IcebergTransaction; import org.apache.doris.datasource.maxcompute.MCTransaction; +import org.apache.doris.datasource.paimon.PaimonTransaction; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.MysqlCommand; @@ -2448,6 +2449,8 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) { } } + LOG.info("updateFragmentExecStatus: fragmentId={}, done={}, isSetPaimonCommitDatas={}, txnId={}", + params.getFragmentId(), params.isDone(), params.isSetPaimonCommitDatas(), txnId); PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId())); if (ctx == null || !ctx.updatePipelineStatus(params)) { LOG.debug("Fragment {} is not done, ignore report status: {}", @@ -2517,6 +2520,14 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) { ((MCTransaction) Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId)) .updateMCCommitData(params.getMcCommitDatas()); } + if (params.isSetPaimonCommitDatas()) { + LOG.info("updateFragmentExecStatus: updating paimon commit data, txnId={}, count={}", + txnId, params.getPaimonCommitDatas().size()); + ((PaimonTransaction) Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId)) + .updatePaimonCommitData(params.getPaimonCommitDatas()); + } else { + LOG.info("updateFragmentExecStatus: paimon commit data NOT set in params"); + } if (ctx.done) { if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java index 45995a7aad72d3..d8462055cf31a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java @@ -24,6 +24,7 @@ import org.apache.doris.datasource.hive.HMSTransaction; import org.apache.doris.datasource.iceberg.IcebergTransaction; import org.apache.doris.datasource.maxcompute.MCTransaction; +import org.apache.doris.datasource.paimon.PaimonTransaction; import org.apache.doris.nereids.util.Utils; import org.apache.doris.qe.AbstractJobProcessor; import org.apache.doris.qe.CoordinatorContext; @@ -234,6 +235,10 @@ protected void doProcessReportExecStatus(TReportExecStatusParams params, SingleF ((MCTransaction) Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId)) .updateMCCommitData(params.getMcCommitDatas()); } + if (params.isSetPaimonCommitDatas()) { + ((PaimonTransaction) Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId)) + .updatePaimonCommitData(params.getPaimonCommitDatas()); + } if (fragmentTask.isDone()) { if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PaimonTransactionManager.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PaimonTransactionManager.java new file mode 100644 index 00000000000000..a3e9b992e62a8e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PaimonTransactionManager.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.transaction; + +import org.apache.doris.datasource.paimon.PaimonMetadataOps; +import org.apache.doris.datasource.paimon.PaimonTransaction; + +public class PaimonTransactionManager extends AbstractExternalTransactionManager { + + public PaimonTransactionManager(PaimonMetadataOps ops) { + super(ops); + } + + @Override + PaimonTransaction createTransaction() { + return new PaimonTransaction((PaimonMetadataOps) ops); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java index e08f13ad0a8734..01833129b8f3cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java @@ -20,6 +20,7 @@ import org.apache.doris.datasource.hive.HiveMetadataOps; import org.apache.doris.datasource.iceberg.IcebergMetadataOps; import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog; +import org.apache.doris.datasource.paimon.PaimonMetadataOps; import org.apache.doris.fs.FileSystemProvider; import java.util.concurrent.Executor; @@ -38,4 +39,8 @@ public static TransactionManager createIcebergTransactionManager(IcebergMetadata public static TransactionManager createMCTransactionManager(MaxComputeExternalCatalog catalog) { return new MCTransactionManager(catalog); } + + public static TransactionManager createPaimonTransactionManager(PaimonMetadataOps ops) { + return new PaimonTransactionManager(ops); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java index 83e092c0ed7136..d6a587eaa2effe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java @@ -22,5 +22,6 @@ public enum TransactionType { HMS, ICEBERG, JDBC, - MAXCOMPUTE + MAXCOMPUTE, + PAIMON } diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 8add59d47af770..de0ef25ec1231c 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -44,6 +44,7 @@ enum TDataSinkType { BLACKHOLE_SINK = 16, TVF_TABLE_SINK = 17, MAXCOMPUTE_TABLE_SINK = 18, + PAIMON_TABLE_SINK = 19, } enum TResultSinkType { @@ -516,6 +517,28 @@ struct TMaxComputeTableSink { 16: optional map properties // contains authentication properties } +struct TPaimonCommitData { + 1: optional string file_path + 2: optional i64 row_count + 3: optional i64 file_size + 4: optional list partition_values +} + +struct TPaimonTableSink { + 1: optional string db_name + 2: optional string tb_name + 3: optional PlanNodes.TFileFormatType file_format + 4: optional string output_path + 5: optional map hadoop_config + 6: optional bool overwrite + 7: optional Types.TFileType file_type + 8: optional PlanNodes.TFileCompressType compression_type + 9: optional list broker_addresses; + 10: optional list partition_columns + 11: optional string paimon_table_serialized // serialized Paimon Table object for JNI writer + 12: optional list columns // column descriptors (PARTITION_KEY vs REGULAR) +} + struct TDataSink { 1: required TDataSinkType type 2: optional TDataStreamSink stream_sink @@ -534,4 +557,5 @@ struct TDataSink { 16: optional TBlackholeSink blackhole_sink 17: optional TTVFTableSink tvf_table_sink 18: optional TMaxComputeTableSink max_compute_table_sink + 19: optional TPaimonTableSink paimon_table_sink } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 82a2d40454f008..96f83f5478f783 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -318,6 +318,8 @@ struct TReportExecStatusParams { 32: optional list mc_commit_datas 33: optional string first_error_msg + + 34: optional list paimon_commit_datas } struct TFeResult { From dfe250859f21aab273ea32947acacac278939e6a Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Wed, 18 Mar 2026 15:13:57 +0800 Subject: [PATCH 2/4] add ut and regression test --- .../paimon/PaimonTransactionTest.java | 225 +++++++++++++++++ .../doris/planner/PaimonTableSinkTest.java | 231 ++++++++++++++++++ .../write/test_paimon_write_insert.groovy | 211 ++++++++++++++++ 3 files changed, 667 insertions(+) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonTransactionTest.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/planner/PaimonTableSinkTest.java create mode 100644 regression-test/suites/external_table_p0/paimon/write/test_paimon_write_insert.groovy diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonTransactionTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonTransactionTest.java new file mode 100644 index 00000000000000..1098644ff37556 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonTransactionTest.java @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.common.UserException; +import org.apache.doris.thrift.TPaimonCommitData; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class PaimonTransactionTest { + + private PaimonMetadataOps ops; + private PaimonExternalCatalog catalog; + private PaimonExternalTable table; + private FileStoreTable paimonTable; + private BatchWriteBuilder writeBuilder; + private BatchTableCommit committer; + + @Before + public void setUp() throws Exception { + ops = Mockito.mock(PaimonMetadataOps.class); + catalog = Mockito.mock(PaimonExternalCatalog.class); + table = Mockito.mock(PaimonExternalTable.class); + paimonTable = Mockito.mock(FileStoreTable.class); + writeBuilder = Mockito.mock(BatchWriteBuilder.class); + committer = Mockito.mock(BatchTableCommit.class); + + Catalog paimonCatalog = Mockito.mock(Catalog.class); + + Mockito.when(table.getCatalog()).thenReturn(catalog); + Mockito.when(table.getDbName()).thenReturn("test_db"); + Mockito.when(table.getName()).thenReturn("test_tbl"); + Mockito.when(catalog.getPaimonCatalog()).thenReturn(paimonCatalog); + Mockito.when(paimonCatalog.getTable(Identifier.create("test_db", "test_tbl"))) + .thenReturn(paimonTable); + + // Non-partitioned table schema + TableSchema schema = Mockito.mock(TableSchema.class); + Mockito.when(schema.id()).thenReturn(1L); + Mockito.when(paimonTable.schema()).thenReturn(schema); + Mockito.when(paimonTable.partitionKeys()).thenReturn(Collections.emptyList()); + RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING()); + Mockito.when(paimonTable.rowType()).thenReturn(rowType); + + Mockito.when(paimonTable.newBatchWriteBuilder()).thenReturn(writeBuilder); + Mockito.when(writeBuilder.newCommit()).thenReturn(committer); + Mockito.when(writeBuilder.withOverwrite()).thenReturn(writeBuilder); + } + + @Test + public void testUpdatePaimonCommitDataIsThreadSafe() { + PaimonTransaction txn = new PaimonTransaction(ops); + + TPaimonCommitData d1 = new TPaimonCommitData(); + d1.setFilePath("/path/to/data-1.parquet"); + d1.setRowCount(100L); + d1.setFileSize(1024L); + + TPaimonCommitData d2 = new TPaimonCommitData(); + d2.setFilePath("/path/to/data-2.parquet"); + d2.setRowCount(200L); + d2.setFileSize(2048L); + + txn.updatePaimonCommitData(Collections.singletonList(d1)); + txn.updatePaimonCommitData(Collections.singletonList(d2)); + + Assert.assertEquals(300L, txn.getUpdateCnt()); + } + + @Test + public void testGetUpdateCntSumsRowCounts() { + PaimonTransaction txn = new PaimonTransaction(ops); + + List batch = Arrays.asList( + makeCommitData("/path/file1.parquet", 50L, 512L), + makeCommitData("/path/file2.parquet", 75L, 768L), + makeCommitData("/path/file3.parquet", 25L, 256L)); + txn.updatePaimonCommitData(batch); + + Assert.assertEquals(150L, txn.getUpdateCnt()); + } + + @Test + public void testGetUpdateCntReturnsZeroWhenNoData() { + PaimonTransaction txn = new PaimonTransaction(ops); + Assert.assertEquals(0L, txn.getUpdateCnt()); + } + + @Test + public void testBeginInsertInitializesCommitter() throws UserException { + PaimonTransaction txn = new PaimonTransaction(ops); + txn.beginInsert(table, java.util.Optional.empty()); + + // beginInsert should call newBatchWriteBuilder and newCommit + Mockito.verify(paimonTable).newBatchWriteBuilder(); + Mockito.verify(writeBuilder).newCommit(); + } + + @Test + public void testCommitCallsCommitterWithCorrectFileCount() throws Exception { + PaimonTransaction txn = new PaimonTransaction(ops); + txn.beginInsert(table, java.util.Optional.empty()); + + txn.updatePaimonCommitData(Arrays.asList( + makeCommitData("hdfs://nn/warehouse/db/tbl/bucket-0/data-0.parquet", 100L, 1024L), + makeCommitData("hdfs://nn/warehouse/db/tbl/bucket-0/data-1.parquet", 200L, 2048L))); + + txn.commit(); + + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + Mockito.verify(committer).commit(captor.capture()); + Assert.assertEquals(2, captor.getValue().size()); + Mockito.verify(committer).close(); + } + + @Test + public void testCommitWithEmptyDataCommitsEmptyList() throws Exception { + PaimonTransaction txn = new PaimonTransaction(ops); + txn.beginInsert(table, java.util.Optional.empty()); + txn.commit(); + + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + Mockito.verify(committer).commit(captor.capture()); + Assert.assertTrue(captor.getValue().isEmpty()); + } + + @Test + public void testRollbackClosesCommitterWithoutCommit() throws Exception { + PaimonTransaction txn = new PaimonTransaction(ops); + txn.beginInsert(table, java.util.Optional.empty()); + txn.updatePaimonCommitData(Collections.singletonList( + makeCommitData("/path/data.parquet", 100L, 1024L))); + txn.rollback(); + + Mockito.verify(committer, Mockito.never()).commit(Mockito.anyList()); + Mockito.verify(committer).close(); + } + + @Test + public void testFileNameExtractedFromFullPath() throws Exception { + PaimonTransaction txn = new PaimonTransaction(ops); + txn.beginInsert(table, java.util.Optional.empty()); + + // path contains directories - only the filename should be used in DataFileMeta + txn.updatePaimonCommitData(Collections.singletonList( + makeCommitData("s3://bucket/warehouse/db/tbl/bucket-0/00001-abc.parquet", 10L, 100L))); + + txn.commit(); + + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + Mockito.verify(committer).commit(captor.capture()); + Assert.assertEquals(1, captor.getValue().size()); + // CommitMessage itself is an opaque object; just verify commit was called with one message + } + + @Test + public void testBeginInsertWithOverwriteContext() throws UserException { + PaimonTransaction txn = new PaimonTransaction(ops); + org.apache.doris.nereids.trees.plans.commands.insert.PaimonInsertCommandContext ctx = + new org.apache.doris.nereids.trees.plans.commands.insert.PaimonInsertCommandContext(true); + txn.beginInsert(table, java.util.Optional.of(ctx)); + + // withOverwrite() should be called on the writeBuilder + Mockito.verify(writeBuilder).withOverwrite(); + } + + @Test(expected = UserException.class) + public void testBeginInsertThrowsOnCatalogError() throws Exception { + Catalog badCatalog = Mockito.mock(Catalog.class); + Mockito.when(catalog.getPaimonCatalog()).thenReturn(badCatalog); + Mockito.when(badCatalog.getTable(Mockito.any())) + .thenThrow(new Catalog.TableNotExistException( + Identifier.create("test_db", "test_tbl"))); + + PaimonTransaction txn = new PaimonTransaction(ops); + txn.beginInsert(table, java.util.Optional.empty()); + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + private TPaimonCommitData makeCommitData(String path, long rowCount, long fileSize) { + TPaimonCommitData d = new TPaimonCommitData(); + d.setFilePath(path); + d.setRowCount(rowCount); + d.setFileSize(fileSize); + return d; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/PaimonTableSinkTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/PaimonTableSinkTest.java new file mode 100644 index 00000000000000..c88c0ac261e58f --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/PaimonTableSinkTest.java @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.paimon.PaimonExternalCatalog; +import org.apache.doris.datasource.paimon.PaimonExternalTable; +import org.apache.doris.nereids.trees.plans.commands.insert.PaimonInsertCommandContext; +import org.apache.doris.thrift.TDataSinkType; +import org.apache.doris.thrift.TFileCompressType; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THiveColumnType; +import org.apache.doris.thrift.TPaimonTableSink; + +import mockit.Mock; +import mockit.MockUp; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.fs.Path; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class PaimonTableSinkTest { + + /** + * Test that bindDataSink() correctly builds TPaimonTableSink for a non-partitioned parquet table. + */ + @Test + public void testBindDataSinkParquetNonPartitioned() throws AnalysisException { + List columns = Arrays.asList( + new Column("k", PrimitiveType.INT), + new Column("v", PrimitiveType.STRING)); + + mockPaimonTable("s3://my-bucket/warehouse/db/tbl", + "parquet", Collections.emptyList(), columns, new HashMap<>()); + + PaimonExternalTable table = buildMockTable("test_db", "test_tbl", columns); + PaimonTableSink sink = new PaimonTableSink(table); + sink.bindDataSink(Optional.empty()); + + Assert.assertNotNull(sink.tDataSink); + Assert.assertEquals(TDataSinkType.PAIMON_TABLE_SINK, sink.tDataSink.getType()); + + TPaimonTableSink tSink = sink.tDataSink.getPaimonTableSink(); + Assert.assertEquals("test_db", tSink.getDbName()); + Assert.assertEquals("test_tbl", tSink.getTbName()); + Assert.assertEquals(TFileFormatType.FORMAT_PARQUET, tSink.getFileFormat()); + Assert.assertEquals(TFileCompressType.SNAPPYBLOCK, tSink.getCompressionType()); + Assert.assertEquals("s3://my-bucket/warehouse/db/tbl", tSink.getOutputPath()); + Assert.assertEquals(TFileType.FILE_S3, tSink.getFileType()); + Assert.assertFalse(tSink.isSetPartitionColumns()); + + // columns: all REGULAR since no partition keys + Assert.assertEquals(2, tSink.getColumns().size()); + tSink.getColumns().forEach(c -> Assert.assertEquals(THiveColumnType.REGULAR, c.getColumnType())); + } + + /** + * Test that bindDataSink() correctly builds TPaimonTableSink for an ORC partitioned table. + */ + @Test + public void testBindDataSinkOrcPartitioned() throws AnalysisException { + List columns = Arrays.asList( + new Column("k", PrimitiveType.INT), + new Column("v", PrimitiveType.STRING), + new Column("dt", PrimitiveType.STRING)); + List partitionKeys = Collections.singletonList("dt"); + + mockPaimonTable("hdfs://namenode:9000/warehouse/db/tbl", + "orc", partitionKeys, columns, new HashMap<>()); + + PaimonExternalTable table = buildMockTable("test_db", "part_tbl", columns); + PaimonTableSink sink = new PaimonTableSink(table); + sink.bindDataSink(Optional.empty()); + + TPaimonTableSink tSink = sink.tDataSink.getPaimonTableSink(); + Assert.assertEquals(TFileFormatType.FORMAT_ORC, tSink.getFileFormat()); + Assert.assertEquals(TFileCompressType.ZLIB, tSink.getCompressionType()); + Assert.assertEquals(TFileType.FILE_HDFS, tSink.getFileType()); + Assert.assertTrue(tSink.isSetPartitionColumns()); + Assert.assertEquals(Collections.singletonList("dt"), tSink.getPartitionColumns()); + + // dt should be PARTITION_KEY, k and v should be REGULAR + Map typeMap = new HashMap<>(); + tSink.getColumns().forEach(c -> typeMap.put(c.getName(), c.getColumnType())); + Assert.assertEquals(THiveColumnType.REGULAR, typeMap.get("k")); + Assert.assertEquals(THiveColumnType.REGULAR, typeMap.get("v")); + Assert.assertEquals(THiveColumnType.PARTITION_KEY, typeMap.get("dt")); + } + + /** + * Test that INSERT OVERWRITE sets the overwrite flag in TPaimonTableSink. + */ + @Test + public void testBindDataSinkWithOverwriteContext() throws AnalysisException { + List columns = Arrays.asList( + new Column("k", PrimitiveType.INT), + new Column("v", PrimitiveType.STRING)); + + mockPaimonTable("s3://my-bucket/warehouse/db/tbl", + "parquet", Collections.emptyList(), columns, new HashMap<>()); + + PaimonExternalTable table = buildMockTable("test_db", "test_tbl", columns); + PaimonTableSink sink = new PaimonTableSink(table); + + PaimonInsertCommandContext ctx = new PaimonInsertCommandContext(true); + sink.bindDataSink(Optional.of(ctx)); + + TPaimonTableSink tSink = sink.tDataSink.getPaimonTableSink(); + Assert.assertTrue(tSink.isOverwrite()); + } + + /** + * Test that hadoop config from catalog is populated in the thrift struct. + */ + @Test + public void testBindDataSinkHadoopConfig() throws AnalysisException { + List columns = Collections.singletonList(new Column("id", PrimitiveType.INT)); + Map catalogOptions = new HashMap<>(); + catalogOptions.put("s3.endpoint", "http://minio:9000"); + catalogOptions.put("s3.access_key", "admin"); + + mockPaimonTable("s3://warehouse/db/tbl", "parquet", + Collections.emptyList(), columns, catalogOptions); + + PaimonExternalTable table = buildMockTable("db", "tbl", columns); + PaimonTableSink sink = new PaimonTableSink(table); + sink.bindDataSink(Optional.empty()); + + TPaimonTableSink tSink = sink.tDataSink.getPaimonTableSink(); + Assert.assertNotNull(tSink.getHadoopConfig()); + Assert.assertEquals("http://minio:9000", tSink.getHadoopConfig().get("s3.endpoint")); + Assert.assertEquals("admin", tSink.getHadoopConfig().get("s3.access_key")); + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + private PaimonExternalTable buildMockTable(String dbName, String tblName, List columns) { + new MockUp() { + @Mock + public String getDbName() { + return dbName; + } + + @Mock + public String getName() { + return tblName; + } + + @Mock + public List getColumns() { + return columns; + } + + @Mock + public PaimonExternalCatalog getCatalog() { + return buildMockCatalog(); + } + }; + // Return null; all methods are mocked via MockUp + return null; + } + + private PaimonExternalCatalog buildMockCatalog() { + return null; // methods mocked via MockUp + } + + /** + * Sets up MockUp for PaimonExternalTable.getPaimonTable() and PaimonExternalCatalog.getPaimonOptionsMap() + * so that PaimonTableSink.bindDataSink() can run without real Paimon infrastructure. + */ + private void mockPaimonTable(String location, String fileFormat, + List partitionKeys, List columns, + Map catalogOptions) { + new MockUp() { + @Mock + public Map getPaimonOptionsMap() { + return new HashMap<>(catalogOptions); + } + }; + + new MockUp() { + @Mock + public org.apache.paimon.table.Table getPaimonTable(Optional> requiredFields) { + FileStoreTable mockTable = org.mockito.Mockito.mock(FileStoreTable.class); + + // location + org.mockito.Mockito.when(mockTable.location()).thenReturn(new Path(location)); + + // options: file format + Map opts = new HashMap<>(); + opts.put(CoreOptions.FILE_FORMAT.key(), fileFormat); + org.mockito.Mockito.when(mockTable.options()).thenReturn(opts); + + // partition keys + org.mockito.Mockito.when(mockTable.partitionKeys()) + .thenReturn(new ArrayList<>(partitionKeys)); + + return mockTable; + } + }; + } +} diff --git a/regression-test/suites/external_table_p0/paimon/write/test_paimon_write_insert.groovy b/regression-test/suites/external_table_p0/paimon/write/test_paimon_write_insert.groovy new file mode 100644 index 00000000000000..8b219401936108 --- /dev/null +++ b/regression-test/suites/external_table_p0/paimon/write/test_paimon_write_insert.groovy @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_write_insert", "p0,external") { + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable paimon test.") + return + } + + String minioPort = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalogName = "test_paimon_write_insert" + String testDb = "test_paimon_write_db" + + sql """drop catalog if exists ${catalogName}""" + sql """ + CREATE CATALOG ${catalogName} PROPERTIES ( + 'type' = 'paimon', + 'warehouse' = 's3://warehouse/wh', + 's3.endpoint' = 'http://${externalEnvIp}:${minioPort}', + 's3.access_key' = 'admin', + 's3.secret_key' = 'password', + 's3.path.style.access' = 'true' + ); + """ + + try { + // ---- basic non-partitioned table ---- + def q01 = { + def tbl = "${catalogName}.${testDb}.paimon_write_basic" + sql """drop table if exists ${tbl}""" + sql """ + CREATE TABLE ${tbl} ( + id INT, + name STRING, + score DOUBLE + ) ENGINE=paimon + """ + + // INSERT INTO with VALUES + sql """ INSERT INTO ${tbl} VALUES (1, 'Alice', 95.5) """ + sql """ INSERT INTO ${tbl} VALUES (2, 'Bob', 88.0), (3, 'Carol', 72.3) """ + sql """ REFRESH CATALOG ${catalogName} """ + order_qt_basic_all """ SELECT * FROM ${tbl} ORDER BY id """ + + // INSERT INTO with SELECT from internal table + sql """ INSERT INTO ${tbl} SELECT id + 10, name, score FROM ${tbl} WHERE id <= 2 """ + sql """ REFRESH CATALOG ${catalogName} """ + order_qt_basic_select """ SELECT * FROM ${tbl} ORDER BY id """ + + sql """drop table if exists ${tbl}""" + } + + // ---- partitioned table (string partition) ---- + def q02 = { + def tbl = "${catalogName}.${testDb}.paimon_write_partitioned" + sql """drop table if exists ${tbl}""" + sql """ + CREATE TABLE ${tbl} ( + id INT, + val STRING, + dt STRING + ) ENGINE=paimon + PARTITION BY LIST(dt) () + """ + + sql """ INSERT INTO ${tbl} VALUES (1, 'aaa', '2024-01-01') """ + sql """ INSERT INTO ${tbl} VALUES (2, 'bbb', '2024-01-02'), (3, 'ccc', '2024-01-01') """ + sql """ REFRESH CATALOG ${catalogName} """ + order_qt_part_all """ SELECT * FROM ${tbl} ORDER BY id """ + order_qt_part_filter """ SELECT * FROM ${tbl} WHERE dt='2024-01-01' ORDER BY id """ + + sql """drop table if exists ${tbl}""" + } + + // ---- INSERT OVERWRITE ---- + def q03 = { + def tbl = "${catalogName}.${testDb}.paimon_write_overwrite" + sql """drop table if exists ${tbl}""" + sql """ + CREATE TABLE ${tbl} ( + k INT, + v STRING + ) ENGINE=paimon + """ + + sql """ INSERT INTO ${tbl} VALUES (1, 'old_value') """ + sql """ REFRESH CATALOG ${catalogName} """ + order_qt_overwrite_before """ SELECT * FROM ${tbl} ORDER BY k """ + + sql """ INSERT OVERWRITE TABLE ${tbl} VALUES (1, 'new_value'), (2, 'another') """ + sql """ REFRESH CATALOG ${catalogName} """ + order_qt_overwrite_after """ SELECT * FROM ${tbl} ORDER BY k """ + + sql """drop table if exists ${tbl}""" + } + + // ---- various scalar data types ---- + def q04 = { + def tbl = "${catalogName}.${testDb}.paimon_write_types" + sql """drop table if exists ${tbl}""" + sql """ + CREATE TABLE ${tbl} ( + c_boolean BOOLEAN, + c_tinyint TINYINT, + c_smallint SMALLINT, + c_int INT, + c_bigint BIGINT, + c_float FLOAT, + c_double DOUBLE, + c_decimal DECIMAL(10,4), + c_string STRING, + c_varchar VARCHAR(128), + c_date DATE, + c_datetime DATETIME + ) ENGINE=paimon + """ + + sql """ + INSERT INTO ${tbl} VALUES ( + true, + 127, + 32767, + 2147483647, + 9223372036854775807, + CAST(3.14 AS FLOAT), + CAST(2.718281828 AS DOUBLE), + CAST(12345.6789 AS DECIMAL(10,4)), + 'hello paimon', + 'varchar_val', + '2024-06-01', + '2024-06-01 12:00:00' + ) + """ + sql """ REFRESH CATALOG ${catalogName} """ + order_qt_types_all """ SELECT * FROM ${tbl} """ + + sql """drop table if exists ${tbl}""" + } + + // ---- INSERT with column list (partial columns) ---- + def q05 = { + def tbl = "${catalogName}.${testDb}.paimon_write_col_list" + sql """drop table if exists ${tbl}""" + sql """ + CREATE TABLE ${tbl} ( + id INT, + name STRING, + age INT + ) ENGINE=paimon + """ + + sql """ INSERT INTO ${tbl} (id, name) VALUES (1, 'test_user') """ + sql """ REFRESH CATALOG ${catalogName} """ + order_qt_col_list """ SELECT * FROM ${tbl} ORDER BY id """ + + sql """drop table if exists ${tbl}""" + } + + // ---- multiple file formats ---- + def q06 = { String fileFormat -> + def tbl = "${catalogName}.${testDb}.paimon_write_fmt_${fileFormat}" + sql """drop table if exists ${tbl}""" + sql """ + CREATE TABLE ${tbl} ( + id INT, + val STRING + ) ENGINE=paimon + PROPERTIES ('file.format' = '${fileFormat}') + """ + + sql """ INSERT INTO ${tbl} VALUES (1, 'row1'), (2, 'row2'), (3, 'row3') """ + sql """ REFRESH CATALOG ${catalogName} """ + order_qt_fmt_${fileFormat} """ SELECT * FROM ${tbl} ORDER BY id """ + + sql """drop table if exists ${tbl}""" + } + + // Create test database + sql """switch ${catalogName}""" + sql """CREATE DATABASE IF NOT EXISTS ${testDb}""" + + // Run all test closures + q01.call() + q02.call() + q03.call() + q04.call() + q05.call() + q06.call("parquet") + q06.call("orc") + + } finally { + sql """switch internal""" + sql """drop catalog if exists ${catalogName}""" + } +} From 4d6bcd432d77094e3d7533aef3e5f52946a0f4b0 Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Wed, 18 Mar 2026 16:13:41 +0800 Subject: [PATCH 3/4] optimize clang format --- .../exec/paimon_table_sink_operator.h | 2 +- .../paimon/vpaimon_partition_writer.cpp | 22 ++++++------ .../writer/paimon/vpaimon_table_writer.cpp | 34 ++++++++----------- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/be/src/pipeline/exec/paimon_table_sink_operator.h b/be/src/pipeline/exec/paimon_table_sink_operator.h index f6b48c7c43b4ad..58944911bebafe 100644 --- a/be/src/pipeline/exec/paimon_table_sink_operator.h +++ b/be/src/pipeline/exec/paimon_table_sink_operator.h @@ -46,7 +46,7 @@ class PaimonTableSinkOperatorX final : public DataSinkOperatorX; PaimonTableSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc, - const std::vector& t_output_expr) + const std::vector& t_output_expr) : Base(operator_id, 0, 0), _row_desc(row_desc), _t_output_expr(t_output_expr), diff --git a/be/src/vec/sink/writer/paimon/vpaimon_partition_writer.cpp b/be/src/vec/sink/writer/paimon/vpaimon_partition_writer.cpp index 89fc52a9f3ad0d..cc257c69626640 100644 --- a/be/src/vec/sink/writer/paimon/vpaimon_partition_writer.cpp +++ b/be/src/vec/sink/writer/paimon/vpaimon_partition_writer.cpp @@ -26,8 +26,7 @@ namespace doris { namespace vectorized { VPaimonPartitionWriter::VPaimonPartitionWriter( - std::vector partition_values, - const VExprContextSPtrs& write_output_expr_ctxs, + std::vector partition_values, const VExprContextSPtrs& write_output_expr_ctxs, std::vector write_column_names, WriteInfo write_info, std::string file_name, int file_name_index, TFileFormatType::type file_format_type, TFileCompressType::type compress_type, @@ -51,8 +50,8 @@ Status VPaimonPartitionWriter::open(RuntimeState* state, RuntimeProfile* profile fs_properties.broker_addresses = &(_write_info.broker_addresses); } // Files go into bucket-0 subdirectory - std::string target_file = fmt::format("{}/bucket-0/{}", _write_info.write_path, - _get_target_file_name()); + std::string target_file = + fmt::format("{}/bucket-0/{}", _write_info.write_path, _get_target_file_name()); io::FileDescription file_description = {.path = target_file, .fs_name {}}; _fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description)); io::FileWriterOptions file_writer_options = {.used_by_s3_committer = false}; @@ -91,7 +90,8 @@ Status VPaimonPartitionWriter::open(RuntimeState* state, RuntimeProfile* profile return _file_format_transformer->open(); } default: - return Status::InternalError("Unsupported file format type {}", to_string(_file_format_type)); + return Status::InternalError("Unsupported file format type {}", + to_string(_file_format_type)); } } @@ -111,9 +111,10 @@ Status VPaimonPartitionWriter::close(const Status& status) { } } bool status_ok = result_status.ok() && status.ok(); - LOG(INFO) << fmt::format("VPaimonPartitionWriter::close - result_status.ok()={}, status.ok()={}, status_ok={}, row_count={}, file={}", - result_status.ok(), status.ok(), status_ok, _row_count, - _get_target_file_name()); + LOG(INFO) << fmt::format( + "VPaimonPartitionWriter::close - result_status.ok()={}, status.ok()={}, " + "status_ok={}, row_count={}, file={}", + result_status.ok(), status.ok(), status_ok, _row_count, _get_target_file_name()); if (!status_ok && _fs != nullptr) { auto path = fmt::format("{}/bucket-0/{}", _write_info.write_path, _get_target_file_name()); Status st = _fs->delete_file(path); @@ -125,8 +126,9 @@ Status VPaimonPartitionWriter::close(const Status& status) { TPaimonCommitData commit_data; _build_paimon_commit_data(&commit_data); _state->add_paimon_commit_datas(commit_data); - LOG(INFO) << fmt::format("Added paimon commit data: file_path={}, row_count={}, file_size={}", - commit_data.file_path, commit_data.row_count, commit_data.file_size); + LOG(INFO) << fmt::format( + "Added paimon commit data: file_path={}, row_count={}, file_size={}", + commit_data.file_path, commit_data.row_count, commit_data.file_size); } else { LOG(WARNING) << fmt::format("Did NOT add paimon commit data due to status_ok=false"); } diff --git a/be/src/vec/sink/writer/paimon/vpaimon_table_writer.cpp b/be/src/vec/sink/writer/paimon/vpaimon_table_writer.cpp index d6d0aff40e82e4..17ca5572280fb5 100644 --- a/be/src/vec/sink/writer/paimon/vpaimon_table_writer.cpp +++ b/be/src/vec/sink/writer/paimon/vpaimon_table_writer.cpp @@ -157,9 +157,8 @@ Status VPaimonTableWriter::write(RuntimeState* state, vectorized::Block& block) } const std::string& col_name = paimon_sink.columns[_partition_columns_input_index[j]].name; - partition_name += - VHiveUtils::escape_path_name(col_name) + "=" + - VHiveUtils::escape_path_name(partition_values[j]); + partition_name += VHiveUtils::escape_path_name(col_name) + "=" + + VHiveUtils::escape_path_name(partition_values[j]); } auto create_and_open_writer = @@ -201,8 +200,7 @@ Status VPaimonTableWriter::write(RuntimeState* state, vectorized::Block& block) } writer_positions.erase(writer_it->second); _partitions_to_writers.erase(writer_it); - RETURN_IF_ERROR( - create_and_open_writer(i, &file_name, file_name_index + 1, w)); + RETURN_IF_ERROR(create_and_open_writer(i, &file_name, file_name_index + 1, w)); } else { w = writer_it->second; auto pos_it = writer_positions.find(w); @@ -299,16 +297,15 @@ std::shared_ptr VPaimonTableWriter::_create_partition_wr } } - std::string write_path = partition_path.empty() - ? paimon_sink.output_path - : fmt::format("{}/{}", paimon_sink.output_path, partition_path); + std::string write_path = + partition_path.empty() ? paimon_sink.output_path + : fmt::format("{}/{}", paimon_sink.output_path, partition_path); std::string original_write_path = write_path; - VPaimonPartitionWriter::WriteInfo write_info = { - .write_path = write_path, - .original_write_path = original_write_path, - .file_type = paimon_sink.file_type, - .broker_addresses = {}}; + VPaimonPartitionWriter::WriteInfo write_info = {.write_path = write_path, + .original_write_path = original_write_path, + .file_type = paimon_sink.file_type, + .broker_addresses = {}}; if (paimon_sink.__isset.broker_addresses) { write_info.broker_addresses.assign(paimon_sink.broker_addresses.begin(), paimon_sink.broker_addresses.end()); @@ -323,10 +320,9 @@ std::shared_ptr VPaimonTableWriter::_create_partition_wr _write_file_count++; return std::make_shared( - partition_values, _write_output_vexpr_ctxs, column_names, - std::move(write_info), file_name ? *file_name : _compute_file_name(), - file_name_index, paimon_sink.file_format, paimon_sink.compression_type, - paimon_sink.hadoop_config); + partition_values, _write_output_vexpr_ctxs, column_names, std::move(write_info), + file_name ? *file_name : _compute_file_name(), file_name_index, paimon_sink.file_format, + paimon_sink.compression_type, paimon_sink.hadoop_config); } std::vector VPaimonTableWriter::_create_partition_values(vectorized::Block& block, @@ -334,8 +330,8 @@ std::vector VPaimonTableWriter::_create_partition_values(vectorized std::vector partition_values; for (int idx : _partition_columns_input_index) { vectorized::ColumnWithTypeAndName col = block.get_by_position(idx); - std::string value = _to_partition_value( - _vec_output_expr_ctxs[idx]->root()->data_type(), col, position); + std::string value = + _to_partition_value(_vec_output_expr_ctxs[idx]->root()->data_type(), col, position); partition_values.emplace_back(value); } return partition_values; From 6d182450702774ea0dd1e93989af67711b483c3f Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Wed, 18 Mar 2026 16:19:45 +0800 Subject: [PATCH 4/4] optimize --- be/src/vec/sink/writer/paimon/vpaimon_table_writer.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/vec/sink/writer/paimon/vpaimon_table_writer.h b/be/src/vec/sink/writer/paimon/vpaimon_table_writer.h index 5853257b1b0bac..876b72ddca7755 100644 --- a/be/src/vec/sink/writer/paimon/vpaimon_table_writer.h +++ b/be/src/vec/sink/writer/paimon/vpaimon_table_writer.h @@ -60,8 +60,8 @@ class VPaimonTableWriter final : public AsyncResultWriter { std::vector _create_partition_values(vectorized::Block& block, int position); - std::string _to_partition_value(const DataTypePtr& type, - const ColumnWithTypeAndName& col, int position); + std::string _to_partition_value(const DataTypePtr& type, const ColumnWithTypeAndName& col, + int position); std::string _compute_file_name();