diff --git a/be/src/cloud/cloud_base_compaction.h b/be/src/cloud/cloud_base_compaction.h index c89047b919beb3..bff72791f23cf1 100644 --- a/be/src/cloud/cloud_base_compaction.h +++ b/be/src/cloud/cloud_base_compaction.h @@ -18,10 +18,12 @@ #pragma once #include +#include #include "cloud/cloud_storage_engine.h" #include "cloud/cloud_tablet.h" #include "storage/compaction/compaction.h" +#include "storage/compaction_task_tracker.h" namespace doris { @@ -34,6 +36,10 @@ class CloudBaseCompaction : public CloudCompactionMixin { Status execute_compact() override; Status request_global_lock(); + std::optional profile_type() const override { + return CompactionProfileType::BASE; + } + void do_lease(); private: diff --git a/be/src/cloud/cloud_compaction_action.cpp b/be/src/cloud/cloud_compaction_action.cpp index d9a7794edca785..b58ed1488c6b0c 100644 --- a/be/src/cloud/cloud_compaction_action.cpp +++ b/be/src/cloud/cloud_compaction_action.cpp @@ -47,6 +47,7 @@ #include "storage/compaction/cumulative_compaction_policy.h" #include "storage/compaction/cumulative_compaction_time_series_policy.h" #include "storage/compaction/full_compaction.h" +#include "storage/compaction_task_tracker.h" #include "storage/olap_define.h" #include "storage/storage_engine.h" #include "storage/tablet/tablet_manager.h" @@ -166,12 +167,13 @@ Status CloudCompactionAction::_handle_run_compaction(HttpRequest* req, std::stri LOG(INFO) << "manual submit compaction task, tablet id: " << tablet_id << " table id: " << table_id; - // 3. submit compaction task + // 3. submit compaction task (trigger_method=1 for MANUAL) RETURN_IF_ERROR(_engine.submit_compaction_task( - tablet, compaction_type == PARAM_COMPACTION_BASE ? CompactionType::BASE_COMPACTION - : compaction_type == PARAM_COMPACTION_CUMULATIVE - ? CompactionType::CUMULATIVE_COMPACTION - : CompactionType::FULL_COMPACTION)); + tablet, + compaction_type == PARAM_COMPACTION_BASE ? CompactionType::BASE_COMPACTION + : compaction_type == PARAM_COMPACTION_CUMULATIVE ? CompactionType::CUMULATIVE_COMPACTION + : CompactionType::FULL_COMPACTION, + /*trigger_method=*/1)); LOG(INFO) << "Manual compaction task is successfully triggered, tablet id: " << tablet_id << " table id: " << table_id; diff --git a/be/src/cloud/cloud_cumulative_compaction.h b/be/src/cloud/cloud_cumulative_compaction.h index 174d0d57a97cc7..05eb2d919fc141 100644 --- a/be/src/cloud/cloud_cumulative_compaction.h +++ b/be/src/cloud/cloud_cumulative_compaction.h @@ -18,10 +18,12 @@ #pragma once #include +#include #include "cloud/cloud_storage_engine.h" #include "cloud/cloud_tablet.h" #include "storage/compaction/compaction.h" +#include "storage/compaction_task_tracker.h" namespace doris { #include "common/compile_check_begin.h" @@ -36,6 +38,10 @@ class CloudCumulativeCompaction : public CloudCompactionMixin { Status execute_compact() override; Status request_global_lock(); + std::optional profile_type() const override { + return CompactionProfileType::CUMULATIVE; + } + void do_lease(); int64_t get_input_rowsets_bytes() const { return _input_rowsets_total_size; } diff --git a/be/src/cloud/cloud_full_compaction.h b/be/src/cloud/cloud_full_compaction.h index e5c440e52b9b8a..c318c2c32d0477 100644 --- a/be/src/cloud/cloud_full_compaction.h +++ b/be/src/cloud/cloud_full_compaction.h @@ -18,10 +18,12 @@ #pragma once #include +#include #include "cloud/cloud_storage_engine.h" #include "cloud/cloud_tablet.h" #include "storage/compaction/compaction.h" +#include "storage/compaction_task_tracker.h" namespace doris { @@ -35,6 +37,10 @@ class CloudFullCompaction : public CloudCompactionMixin { Status execute_compact() override; Status request_global_lock(); + std::optional profile_type() const override { + return CompactionProfileType::FULL; + } + void do_lease(); protected: diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 927d5bef343b73..ff87a4497458da 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -59,6 +59,7 @@ #include "runtime/memory/cache_manager.h" #include "storage/compaction/cumulative_compaction_policy.h" #include "storage/compaction/cumulative_compaction_time_series_policy.h" +#include "storage/compaction_task_tracker.h" #include "storage/storage_policy.h" #include "util/parse_util.h" #include "util/time.h" @@ -796,7 +797,8 @@ Status CloudStorageEngine::_request_tablet_global_compaction_lock( } } -Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet) { +Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet, + int trigger_method) { using namespace std::chrono; { std::lock_guard lock(_compaction_mtx); @@ -819,6 +821,26 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t _submitted_base_compactions.erase(tablet->tablet_id()); return st; } + // Register task with CompactionTaskTracker as PENDING + auto* tracker = CompactionTaskTracker::instance(); + int64_t compaction_id = compaction->compaction_id(); + { + CompactionTaskInfo info; + info.compaction_id = compaction_id; + info.tablet_id = tablet->tablet_id(); + info.table_id = tablet->table_id(); + info.partition_id = tablet->partition_id(); + info.compaction_type = CompactionProfileType::BASE; + info.status = CompactionTaskStatus::PENDING; + info.trigger_method = static_cast(trigger_method); + info.scheduled_time_ms = + duration_cast(system_clock::now().time_since_epoch()).count(); + info.input_rowsets_count = compaction->input_rowsets_count(); + info.input_row_num = compaction->input_row_num_value(); + info.input_data_size = compaction->input_rowsets_data_size(); + info.input_segments_num = compaction->input_segments_num_value(); + tracker->register_task(std::move(info)); + } { std::lock_guard lock(_compaction_mtx); _submitted_base_compactions[tablet->tablet_id()] = compaction; @@ -830,6 +852,8 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t g_base_compaction_running_task_count << 1; signal::tablet_id = tablet->tablet_id(); Defer defer {[&]() { + // Idempotent cleanup: remove task from tracker + CompactionTaskTracker::instance()->remove_task(compaction_id); g_base_compaction_running_task_count << -1; std::lock_guard lock(_compaction_mtx); _submitted_base_compactions.erase(tablet->tablet_id()); @@ -840,6 +864,13 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t auto st = _request_tablet_global_compaction_lock(ReaderType::READER_BASE_COMPACTION, tablet, compaction); if (!st.ok()) return; + // Update tracker to RUNNING after acquiring global lock + { + RunningStats rs; + rs.start_time_ms = + duration_cast(system_clock::now().time_since_epoch()).count(); + CompactionTaskTracker::instance()->update_to_running(compaction_id, rs); + } st = compaction->execute_compact(); if (!st.ok()) { // Error log has been output in `execute_compact` @@ -852,6 +883,7 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t DorisMetrics::instance()->base_compaction_task_pending_total->set_value( _base_compaction_thread_pool->get_queue_size()); if (!st.ok()) { + tracker->remove_task(compaction_id); std::lock_guard lock(_compaction_mtx); _submitted_base_compactions.erase(tablet->tablet_id()); return Status::InternalError("failed to submit base compaction, tablet_id={}", @@ -860,7 +892,8 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t return st; } -Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet) { +Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet, + int trigger_method) { using namespace std::chrono; { std::lock_guard lock(_compaction_mtx); @@ -893,6 +926,28 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS _tablet_preparing_cumu_compaction.erase(tablet->tablet_id()); return st; } + // Register task with CompactionTaskTracker as PENDING + // IMPORTANT: use compaction->compaction_id(), NOT tracker->next_compaction_id(), + // because the Compaction constructor already allocated an ID via the tracker. + auto* tracker = CompactionTaskTracker::instance(); + int64_t compaction_id = compaction->compaction_id(); + { + CompactionTaskInfo info; + info.compaction_id = compaction_id; + info.tablet_id = tablet->tablet_id(); + info.table_id = tablet->table_id(); + info.partition_id = tablet->partition_id(); + info.compaction_type = CompactionProfileType::CUMULATIVE; + info.status = CompactionTaskStatus::PENDING; + info.trigger_method = static_cast(trigger_method); + info.scheduled_time_ms = + duration_cast(system_clock::now().time_since_epoch()).count(); + info.input_rowsets_count = compaction->input_rowsets_count(); + info.input_row_num = compaction->input_row_num_value(); + info.input_data_size = compaction->input_rowsets_data_size(); + info.input_segments_num = compaction->input_segments_num_value(); + tracker->register_task(std::move(info)); + } { std::lock_guard lock(_compaction_mtx); _tablet_preparing_cumu_compaction.erase(tablet->tablet_id()); @@ -942,6 +997,8 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS Defer defer {[&]() { DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.sleep", { sleep(5); }) + // Idempotent cleanup: remove task from tracker + CompactionTaskTracker::instance()->remove_task(compaction_id); std::lock_guard lock(_cumu_compaction_delay_mtx); _cumu_compaction_thread_pool_used_threads--; if (!is_large_task) { @@ -956,6 +1013,13 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS auto st = _request_tablet_global_compaction_lock(ReaderType::READER_CUMULATIVE_COMPACTION, tablet, compaction); if (!st.ok()) return; + // Update tracker to RUNNING after acquiring global lock + { + RunningStats rs; + rs.start_time_ms = + duration_cast(system_clock::now().time_since_epoch()).count(); + CompactionTaskTracker::instance()->update_to_running(compaction_id, rs); + } do { std::lock_guard lock(_cumu_compaction_delay_mtx); _cumu_compaction_thread_pool_used_threads++; @@ -1007,6 +1071,7 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( _cumu_compaction_thread_pool->get_queue_size()); if (!st.ok()) { + tracker->remove_task(compaction_id); erase_submitted_cumu_compaction(); return Status::InternalError("failed to submit cumu compaction, tablet_id={}", tablet->tablet_id()); @@ -1014,7 +1079,8 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS return st; } -Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet) { +Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet, + int trigger_method) { using namespace std::chrono; { std::lock_guard lock(_compaction_mtx); @@ -1036,6 +1102,26 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t _submitted_full_compactions.erase(tablet->tablet_id()); return st; } + // Register task with CompactionTaskTracker as PENDING + auto* tracker = CompactionTaskTracker::instance(); + int64_t compaction_id = compaction->compaction_id(); + { + CompactionTaskInfo info; + info.compaction_id = compaction_id; + info.tablet_id = tablet->tablet_id(); + info.table_id = tablet->table_id(); + info.partition_id = tablet->partition_id(); + info.compaction_type = CompactionProfileType::FULL; + info.status = CompactionTaskStatus::PENDING; + info.trigger_method = static_cast(trigger_method); + info.scheduled_time_ms = + duration_cast(system_clock::now().time_since_epoch()).count(); + info.input_rowsets_count = compaction->input_rowsets_count(); + info.input_row_num = compaction->input_row_num_value(); + info.input_data_size = compaction->input_rowsets_data_size(); + info.input_segments_num = compaction->input_segments_num_value(); + tracker->register_task(std::move(info)); + } { std::lock_guard lock(_compaction_mtx); _submitted_full_compactions[tablet->tablet_id()] = compaction; @@ -1044,6 +1130,8 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t g_full_compaction_running_task_count << 1; signal::tablet_id = tablet->tablet_id(); Defer defer {[&]() { + // Idempotent cleanup: remove task from tracker + CompactionTaskTracker::instance()->remove_task(compaction_id); g_full_compaction_running_task_count << -1; std::lock_guard lock(_compaction_mtx); _submitted_full_compactions.erase(tablet->tablet_id()); @@ -1051,6 +1139,13 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t auto st = _request_tablet_global_compaction_lock(ReaderType::READER_FULL_COMPACTION, tablet, compaction); if (!st.ok()) return; + // Update tracker to RUNNING after acquiring global lock + { + RunningStats rs; + rs.start_time_ms = + duration_cast(system_clock::now().time_since_epoch()).count(); + CompactionTaskTracker::instance()->update_to_running(compaction_id, rs); + } st = compaction->execute_compact(); if (!st.ok()) { // Error log has been output in `execute_compact` @@ -1061,6 +1156,7 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t _executing_full_compactions.erase(tablet->tablet_id()); }); if (!st.ok()) { + tracker->remove_task(compaction_id); std::lock_guard lock(_compaction_mtx); _submitted_full_compactions.erase(tablet->tablet_id()); return Status::InternalError("failed to submit full compaction, tablet_id={}", @@ -1070,19 +1166,20 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t } Status CloudStorageEngine::submit_compaction_task(const CloudTabletSPtr& tablet, - CompactionType compaction_type) { + CompactionType compaction_type, + int trigger_method) { DCHECK(compaction_type == CompactionType::CUMULATIVE_COMPACTION || compaction_type == CompactionType::BASE_COMPACTION || compaction_type == CompactionType::FULL_COMPACTION); switch (compaction_type) { case CompactionType::BASE_COMPACTION: - RETURN_IF_ERROR(_submit_base_compaction_task(tablet)); + RETURN_IF_ERROR(_submit_base_compaction_task(tablet, trigger_method)); return Status::OK(); case CompactionType::CUMULATIVE_COMPACTION: - RETURN_IF_ERROR(_submit_cumulative_compaction_task(tablet)); + RETURN_IF_ERROR(_submit_cumulative_compaction_task(tablet, trigger_method)); return Status::OK(); case CompactionType::FULL_COMPACTION: - RETURN_IF_ERROR(_submit_full_compaction_task(tablet)); + RETURN_IF_ERROR(_submit_full_compaction_task(tablet, trigger_method)); return Status::OK(); default: return Status::InternalError("unknown compaction type!"); diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 68626ec0d9e461..a737b1b0a3e010 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -136,7 +136,8 @@ class CloudStorageEngine final : public BaseStorageEngine { void get_cumu_compaction(int64_t tablet_id, std::vector>& res); - Status submit_compaction_task(const CloudTabletSPtr& tablet, CompactionType compaction_type); + Status submit_compaction_task(const CloudTabletSPtr& tablet, CompactionType compaction_type, + int trigger_method = 0); Status get_compaction_status_json(std::string* result); @@ -202,9 +203,10 @@ class CloudStorageEngine final : public BaseStorageEngine { std::vector _generate_cloud_compaction_tasks(CompactionType compaction_type, bool check_score); Status _adjust_compaction_thread_num(); - Status _submit_base_compaction_task(const CloudTabletSPtr& tablet); - Status _submit_cumulative_compaction_task(const CloudTabletSPtr& tablet); - Status _submit_full_compaction_task(const CloudTabletSPtr& tablet); + Status _submit_base_compaction_task(const CloudTabletSPtr& tablet, int trigger_method = 0); + Status _submit_cumulative_compaction_task(const CloudTabletSPtr& tablet, + int trigger_method = 0); + Status _submit_full_compaction_task(const CloudTabletSPtr& tablet, int trigger_method = 0); Status _request_tablet_global_compaction_lock(ReaderType compaction_type, const CloudTabletSPtr& tablet, std::shared_ptr compaction); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 54a65f7802af59..a2abae02b702bb 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -555,6 +555,9 @@ DEFINE_mInt32(base_compaction_trace_threshold, "60"); DEFINE_mInt32(cumulative_compaction_trace_threshold, "10"); DEFINE_mBool(disable_compaction_trace_log, "true"); +DEFINE_mBool(enable_compaction_task_tracker, "true"); +DEFINE_mInt32(compaction_task_tracker_max_records, "10000"); + // Interval to picking rowset to compact, in seconds DEFINE_mInt64(pick_rowset_to_compact_interval_sec, "86400"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 07272806481235..c81e08e30d75d1 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -613,6 +613,9 @@ DECLARE_mInt32(base_compaction_trace_threshold); DECLARE_mInt32(cumulative_compaction_trace_threshold); DECLARE_mBool(disable_compaction_trace_log); +DECLARE_mBool(enable_compaction_task_tracker); +DECLARE_mInt32(compaction_task_tracker_max_records); + // Interval to picking rowset to compact, in seconds DECLARE_mInt64(pick_rowset_to_compact_interval_sec); diff --git a/be/src/information_schema/schema_compaction_tasks_scanner.cpp b/be/src/information_schema/schema_compaction_tasks_scanner.cpp new file mode 100644 index 00000000000000..7224281706bee0 --- /dev/null +++ b/be/src/information_schema/schema_compaction_tasks_scanner.cpp @@ -0,0 +1,504 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "information_schema/schema_compaction_tasks_scanner.h" + +#include + +#include +#include +#include +#include + +#include "common/status.h" +#include "core/data_type/define_primitive_type.h" +#include "core/string_ref.h" +#include "runtime/runtime_state.h" +#include "storage/compaction_task_tracker.h" +#include "util/time.h" + +namespace doris { + +#include "common/compile_check_begin.h" + +std::vector SchemaCompactionTasksScanner::_s_tbls_columns = { + // name, type, size, is_null + {"BACKEND_ID", TYPE_BIGINT, sizeof(int64_t), true}, + {"COMPACTION_ID", TYPE_BIGINT, sizeof(int64_t), true}, + {"TABLE_ID", TYPE_BIGINT, sizeof(int64_t), true}, + {"PARTITION_ID", TYPE_BIGINT, sizeof(int64_t), true}, + {"TABLET_ID", TYPE_BIGINT, sizeof(int64_t), true}, + {"COMPACTION_TYPE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"STATUS", TYPE_VARCHAR, sizeof(StringRef), true}, + {"TRIGGER_METHOD", TYPE_VARCHAR, sizeof(StringRef), true}, + {"COMPACTION_SCORE", TYPE_BIGINT, sizeof(int64_t), true}, + {"SCHEDULED_TIME", TYPE_DATETIME, sizeof(int64_t), true}, + {"START_TIME", TYPE_DATETIME, sizeof(int64_t), true}, + {"END_TIME", TYPE_DATETIME, sizeof(int64_t), true}, + {"ELAPSED_TIME_MS", TYPE_BIGINT, sizeof(int64_t), true}, + {"INPUT_ROWSETS_COUNT", TYPE_BIGINT, sizeof(int64_t), true}, + {"INPUT_ROW_NUM", TYPE_BIGINT, sizeof(int64_t), true}, + {"INPUT_DATA_SIZE", TYPE_BIGINT, sizeof(int64_t), true}, + {"INPUT_INDEX_SIZE", TYPE_BIGINT, sizeof(int64_t), true}, + {"INPUT_TOTAL_SIZE", TYPE_BIGINT, sizeof(int64_t), true}, + {"INPUT_SEGMENTS_NUM", TYPE_BIGINT, sizeof(int64_t), true}, + {"INPUT_VERSION_RANGE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"MERGED_ROWS", TYPE_BIGINT, sizeof(int64_t), true}, + {"FILTERED_ROWS", TYPE_BIGINT, sizeof(int64_t), true}, + {"OUTPUT_ROWS", TYPE_BIGINT, sizeof(int64_t), true}, + {"OUTPUT_ROW_NUM", TYPE_BIGINT, sizeof(int64_t), true}, + {"OUTPUT_DATA_SIZE", TYPE_BIGINT, sizeof(int64_t), true}, + {"OUTPUT_INDEX_SIZE", TYPE_BIGINT, sizeof(int64_t), true}, + {"OUTPUT_TOTAL_SIZE", TYPE_BIGINT, sizeof(int64_t), true}, + {"OUTPUT_SEGMENTS_NUM", TYPE_BIGINT, sizeof(int64_t), true}, + {"OUTPUT_VERSION", TYPE_VARCHAR, sizeof(StringRef), true}, + {"MERGE_LATENCY_MS", TYPE_BIGINT, sizeof(int64_t), true}, + {"BYTES_READ_FROM_LOCAL", TYPE_BIGINT, sizeof(int64_t), true}, + {"BYTES_READ_FROM_REMOTE", TYPE_BIGINT, sizeof(int64_t), true}, + {"PEAK_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), true}, + {"IS_VERTICAL", TYPE_BOOLEAN, sizeof(bool), true}, + {"PERMITS", TYPE_BIGINT, sizeof(int64_t), true}, + {"VERTICAL_TOTAL_GROUPS", TYPE_BIGINT, sizeof(int64_t), true}, + {"VERTICAL_COMPLETED_GROUPS", TYPE_BIGINT, sizeof(int64_t), true}, + {"STATUS_MSG", TYPE_VARCHAR, sizeof(StringRef), true}, +}; + +SchemaCompactionTasksScanner::SchemaCompactionTasksScanner() + : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_BE_COMPACTION_TASKS), + _backend_id(0), + _task_idx(0) {}; + +Status SchemaCompactionTasksScanner::start(RuntimeState* state) { + if (!_is_init) { + return Status::InternalError("used before initialized."); + } + _backend_id = state->backend_id(); + _tasks = CompactionTaskTracker::instance()->get_all_tasks(); + return Status::OK(); +} + +Status SchemaCompactionTasksScanner::get_next_block_internal(Block* block, bool* eos) { + if (!_is_init) { + return Status::InternalError("Used before initialized."); + } + if (nullptr == block || nullptr == eos) { + return Status::InternalError("input pointer is nullptr."); + } + + if (_task_idx >= _tasks.size()) { + *eos = true; + return Status::OK(); + } + *eos = false; + return _fill_block_impl(block); +} + +Status SchemaCompactionTasksScanner::_fill_block_impl(Block* block) { + SCOPED_TIMER(_fill_block_timer); + size_t fill_num = std::min(1000UL, _tasks.size() - _task_idx); + size_t fill_idx_begin = _task_idx; + size_t fill_idx_end = _task_idx + fill_num; + std::vector datas(fill_num); + + auto now_ms = UnixMillis(); + + // col 0: BACKEND_ID + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _backend_id; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 0, datas)); + } + // col 1: COMPACTION_ID + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].compaction_id; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 1, datas)); + } + // col 2: TABLE_ID + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].table_id; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 2, datas)); + } + // col 3: PARTITION_ID + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].partition_id; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 3, datas)); + } + // col 4: TABLET_ID + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].tablet_id; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 4, datas)); + } + // col 5: COMPACTION_TYPE + { + std::vector strs(fill_num); + std::vector refs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + strs[i - fill_idx_begin] = to_string(_tasks[i].compaction_type); + refs[i - fill_idx_begin] = + StringRef(strs[i - fill_idx_begin].c_str(), strs[i - fill_idx_begin].size()); + datas[i - fill_idx_begin] = refs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 5, datas)); + } + // col 6: STATUS + { + std::vector strs(fill_num); + std::vector refs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + strs[i - fill_idx_begin] = to_string(_tasks[i].status); + refs[i - fill_idx_begin] = + StringRef(strs[i - fill_idx_begin].c_str(), strs[i - fill_idx_begin].size()); + datas[i - fill_idx_begin] = refs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 6, datas)); + } + // col 7: TRIGGER_METHOD + { + std::vector strs(fill_num); + std::vector refs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + strs[i - fill_idx_begin] = to_string(_tasks[i].trigger_method); + refs[i - fill_idx_begin] = + StringRef(strs[i - fill_idx_begin].c_str(), strs[i - fill_idx_begin].size()); + datas[i - fill_idx_begin] = refs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 7, datas)); + } + // col 8: COMPACTION_SCORE + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].compaction_score; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 8, datas)); + } + // col 9: SCHEDULED_TIME (DATETIME, always set) + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + int64_t ts_ms = _tasks[i].scheduled_time_ms; + if (ts_ms > 0) { + srcs[i - fill_idx_begin].from_unixtime(ts_ms / 1000, _timezone_obj); + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } else { + datas[i - fill_idx_begin] = nullptr; + } + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 9, datas)); + } + // col 10: START_TIME (DATETIME, nullable: 0 when PENDING) + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + int64_t ts_ms = _tasks[i].start_time_ms; + if (ts_ms > 0) { + srcs[i - fill_idx_begin].from_unixtime(ts_ms / 1000, _timezone_obj); + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } else { + datas[i - fill_idx_begin] = nullptr; + } + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 10, datas)); + } + // col 11: END_TIME (DATETIME, nullable: 0 when not completed) + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + int64_t ts_ms = _tasks[i].end_time_ms; + if (ts_ms > 0) { + srcs[i - fill_idx_begin].from_unixtime(ts_ms / 1000, _timezone_obj); + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } else { + datas[i - fill_idx_begin] = nullptr; + } + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, datas)); + } + // col 12: ELAPSED_TIME_MS + // RUNNING: now - start_time; FINISHED/FAILED: end - start; PENDING: 0 + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + const auto& task = _tasks[i]; + if (task.status == CompactionTaskStatus::RUNNING && task.start_time_ms > 0) { + srcs[i - fill_idx_begin] = now_ms - task.start_time_ms; + } else if ((task.status == CompactionTaskStatus::FINISHED || + task.status == CompactionTaskStatus::FAILED) && + task.start_time_ms > 0) { + srcs[i - fill_idx_begin] = task.end_time_ms - task.start_time_ms; + } else { + srcs[i - fill_idx_begin] = 0; + } + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, datas)); + } + // col 13: INPUT_ROWSETS_COUNT + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].input_rowsets_count; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 13, datas)); + } + // col 14: INPUT_ROW_NUM + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].input_row_num; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 14, datas)); + } + // col 15: INPUT_DATA_SIZE + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].input_data_size; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 15, datas)); + } + // col 16: INPUT_INDEX_SIZE + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].input_index_size; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 16, datas)); + } + // col 17: INPUT_TOTAL_SIZE + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].input_total_size; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 17, datas)); + } + // col 18: INPUT_SEGMENTS_NUM + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].input_segments_num; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 18, datas)); + } + // col 19: INPUT_VERSION_RANGE + { + std::vector refs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + const auto& s = _tasks[i].input_version_range; + refs[i - fill_idx_begin] = StringRef(s.c_str(), s.size()); + datas[i - fill_idx_begin] = refs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 19, datas)); + } + // col 20: MERGED_ROWS + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].merged_rows; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 20, datas)); + } + // col 21: FILTERED_ROWS + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].filtered_rows; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 21, datas)); + } + // col 22: OUTPUT_ROWS + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].output_rows; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 22, datas)); + } + // col 23: OUTPUT_ROW_NUM + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].output_row_num; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 23, datas)); + } + // col 24: OUTPUT_DATA_SIZE + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].output_data_size; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 24, datas)); + } + // col 25: OUTPUT_INDEX_SIZE + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].output_index_size; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 25, datas)); + } + // col 26: OUTPUT_TOTAL_SIZE + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].output_total_size; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 26, datas)); + } + // col 27: OUTPUT_SEGMENTS_NUM + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].output_segments_num; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 27, datas)); + } + // col 28: OUTPUT_VERSION + { + std::vector refs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + const auto& s = _tasks[i].output_version; + refs[i - fill_idx_begin] = StringRef(s.c_str(), s.size()); + datas[i - fill_idx_begin] = refs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 28, datas)); + } + // col 29: MERGE_LATENCY_MS + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].merge_latency_ms; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 29, datas)); + } + // col 30: BYTES_READ_FROM_LOCAL + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].bytes_read_from_local; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 30, datas)); + } + // col 31: BYTES_READ_FROM_REMOTE + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].bytes_read_from_remote; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 31, datas)); + } + // col 32: PEAK_MEMORY_BYTES + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].peak_memory_bytes; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 32, datas)); + } + // col 33: IS_VERTICAL (BOOLEAN) + // Note: std::vector is bit-packed and does not provide real pointers, + // so we use a std::unique_ptr instead. + { + auto srcs = std::make_unique(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].is_vertical; + datas[i - fill_idx_begin] = &srcs[i - fill_idx_begin]; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 33, datas)); + } + // col 34: PERMITS + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].permits; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 34, datas)); + } + // col 35: VERTICAL_TOTAL_GROUPS + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].vertical_total_groups; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 35, datas)); + } + // col 36: VERTICAL_COMPLETED_GROUPS + { + std::vector srcs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + srcs[i - fill_idx_begin] = _tasks[i].vertical_completed_groups; + datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 36, datas)); + } + // col 37: STATUS_MSG + { + std::vector refs(fill_num); + for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) { + const auto& s = _tasks[i].status_msg; + refs[i - fill_idx_begin] = StringRef(s.c_str(), s.size()); + datas[i - fill_idx_begin] = refs.data() + i - fill_idx_begin; + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 37, datas)); + } + + _task_idx += fill_num; + return Status::OK(); +} +} // namespace doris diff --git a/be/src/information_schema/schema_compaction_tasks_scanner.h b/be/src/information_schema/schema_compaction_tasks_scanner.h new file mode 100644 index 00000000000000..122806527c6abd --- /dev/null +++ b/be/src/information_schema/schema_compaction_tasks_scanner.h @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "common/status.h" +#include "information_schema/schema_scanner.h" +#include "storage/compaction_task_tracker.h" + +namespace doris { +class RuntimeState; +class Block; + +class SchemaCompactionTasksScanner : public SchemaScanner { + ENABLE_FACTORY_CREATOR(SchemaCompactionTasksScanner); + +public: + SchemaCompactionTasksScanner(); + ~SchemaCompactionTasksScanner() override = default; + + Status start(RuntimeState* state) override; + Status get_next_block_internal(Block* block, bool* eos) override; + +private: + Status _fill_block_impl(Block* block); + + static std::vector _s_tbls_columns; + int64_t _backend_id = 0; + size_t _task_idx = 0; + std::vector _tasks; +}; +} // namespace doris diff --git a/be/src/information_schema/schema_scanner.cpp b/be/src/information_schema/schema_scanner.cpp index dcf5733ee115ab..40f952921f3e40 100644 --- a/be/src/information_schema/schema_scanner.cpp +++ b/be/src/information_schema/schema_scanner.cpp @@ -54,6 +54,7 @@ #include "information_schema/schema_collations_scanner.h" #include "information_schema/schema_column_data_sizes_scanner.h" #include "information_schema/schema_columns_scanner.h" +#include "information_schema/schema_compaction_tasks_scanner.h" #include "information_schema/schema_database_properties_scanner.h" #include "information_schema/schema_dummy_scanner.h" #include "information_schema/schema_encryption_keys_scanner.h" @@ -264,6 +265,8 @@ std::unique_ptr SchemaScanner::create(TSchemaTableType::type type return SchemaFileCacheInfoScanner::create_unique(); case TSchemaTableType::SCH_AUTHENTICATION_INTEGRATIONS: return SchemaAuthenticationIntegrationsScanner::create_unique(); + case TSchemaTableType::SCH_BE_COMPACTION_TASKS: + return SchemaCompactionTasksScanner::create_unique(); default: return SchemaDummyScanner::create_unique(); break; diff --git a/be/src/service/http/action/compaction_action.cpp b/be/src/service/http/action/compaction_action.cpp index fc49a74954d902..1c4989d4610453 100644 --- a/be/src/service/http/action/compaction_action.cpp +++ b/be/src/service/http/action/compaction_action.cpp @@ -42,6 +42,7 @@ #include "storage/compaction/cumulative_compaction_time_series_policy.h" #include "storage/compaction/full_compaction.h" #include "storage/compaction/single_replica_compaction.h" +#include "storage/compaction_task_tracker.h" #include "storage/olap_define.h" #include "storage/storage_engine.h" #include "storage/tablet/tablet_manager.h" @@ -154,8 +155,8 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j [table_id](Tablet* tablet) -> bool { return tablet->get_table_id() == table_id; }); for (const auto& tablet : tablet_vec) { tablet->set_last_full_compaction_schedule_time(UnixMillis()); - RETURN_IF_ERROR( - _engine.submit_compaction_task(tablet, CompactionType::FULL_COMPACTION, false)); + RETURN_IF_ERROR(_engine.submit_compaction_task(tablet, CompactionType::FULL_COMPACTION, + false, true, 1)); } } else { // 2. fetch the tablet by tablet_id @@ -303,13 +304,37 @@ Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet, tablet->set_cumulative_compaction_policy(cumulative_compaction_policy); } Status res = Status::OK(); - auto do_compact = [](Compaction& compaction) { + auto* tracker = CompactionTaskTracker::instance(); + auto do_compact = [&](Compaction& compaction, CompactionProfileType profile_type) { RETURN_IF_ERROR(compaction.prepare_compact()); - return compaction.execute_compact(); + // Register task as RUNNING with tracker (manual trigger, direct execution path) + // Use compaction.compaction_id() which was allocated in constructor. + int64_t compaction_id = compaction.compaction_id(); + { + CompactionTaskInfo info; + info.compaction_id = compaction_id; + info.tablet_id = tablet->tablet_id(); + info.table_id = tablet->get_table_id(); + info.partition_id = tablet->partition_id(); + info.compaction_type = profile_type; + info.status = CompactionTaskStatus::RUNNING; + info.trigger_method = TriggerMethod::MANUAL; + auto now_ms = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + info.scheduled_time_ms = now_ms; + info.start_time_ms = now_ms; + tracker->register_task(std::move(info)); + } + auto st = compaction.execute_compact(); + // Idempotent cleanup: if execute returned early (e.g. TRY_LOCK_FAILED) + // before submit_profile_record was called, task remains in active_tasks. + tracker->remove_task(compaction_id); + return st; }; if (compaction_type == PARAM_COMPACTION_BASE) { BaseCompaction base_compaction(_engine, tablet); - res = do_compact(base_compaction); + res = do_compact(base_compaction, CompactionProfileType::BASE); if (!res) { if (!res.is()) { DorisMetrics::instance()->base_compaction_request_failed->increment(1); @@ -319,14 +344,14 @@ Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet, if (fetch_from_remote) { SingleReplicaCompaction single_compaction(_engine, tablet, CompactionType::CUMULATIVE_COMPACTION); - res = do_compact(single_compaction); + res = do_compact(single_compaction, CompactionProfileType::CUMULATIVE); if (!res) { LOG(WARNING) << "failed to do single compaction. res=" << res << ", table=" << tablet->tablet_id(); } } else { CumulativeCompaction cumulative_compaction(_engine, tablet); - res = do_compact(cumulative_compaction); + res = do_compact(cumulative_compaction, CompactionProfileType::CUMULATIVE); if (!res) { if (res.is()) { // Ignore this error code. @@ -342,7 +367,7 @@ Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet, } } else if (compaction_type == PARAM_COMPACTION_FULL) { FullCompaction full_compaction(_engine, tablet); - res = do_compact(full_compaction); + res = do_compact(full_compaction, CompactionProfileType::FULL); if (!res) { if (res.is()) { // Ignore this error code. diff --git a/be/src/service/http/action/compaction_profile_action.cpp b/be/src/service/http/action/compaction_profile_action.cpp new file mode 100644 index 00000000000000..2f4dedd8b18aef --- /dev/null +++ b/be/src/service/http/action/compaction_profile_action.cpp @@ -0,0 +1,270 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "service/http/action/compaction_profile_action.h" + +#include +#include +#include + +#include +#include +#include + +#include "common/logging.h" +#include "service/http/http_channel.h" +#include "service/http/http_headers.h" +#include "service/http/http_request.h" +#include "service/http/http_status.h" +#include "storage/compaction_task_tracker.h" + +namespace doris { + +#include "common/compile_check_begin.h" + +namespace { + +// Format millisecond timestamp to "YYYY-MM-DD HH:MM:SS" string. +// Returns empty string for 0 timestamps. +std::string format_timestamp_ms(int64_t timestamp_ms) { + if (timestamp_ms <= 0) { + return ""; + } + time_t ts = static_cast(timestamp_ms / 1000); + struct tm local_tm; + if (localtime_r(&ts, &local_tm) == nullptr) { + return ""; + } + char buf[64]; + strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &local_tm); + return std::string(buf); +} + +} // namespace + +CompactionProfileAction::CompactionProfileAction(ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type ptype) + : HttpHandlerWithAuth(exec_env, hier, ptype) {} + +void CompactionProfileAction::handle(HttpRequest* req) { + req->add_output_header(HttpHeaders::CONTENT_TYPE, HttpHeaders::JSON_TYPE.data()); + + // Parse optional parameters + int64_t tablet_id = 0; + int64_t top_n = 0; + std::string compact_type; + int success_filter = -1; // -1 = no filter, 0 = failed only, 1 = success only + + // tablet_id + const auto& tablet_id_str = req->param("tablet_id"); + if (!tablet_id_str.empty()) { + try { + tablet_id = std::stoll(tablet_id_str); + if (tablet_id < 0) { + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, + R"({"status":"Error","msg":"tablet_id must be >= 0"})"); + return; + } + } catch (const std::exception& e) { + auto msg = R"({"status":"Error","msg":"invalid tablet_id: )" + std::string(e.what()) + + "\"}"; + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, msg); + return; + } + } + + // top_n + const auto& top_n_str = req->param("top_n"); + if (!top_n_str.empty()) { + try { + top_n = std::stoll(top_n_str); + if (top_n < 0) { + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, + R"({"status":"Error","msg":"top_n must be >= 0"})"); + return; + } + } catch (const std::exception& e) { + auto msg = + R"({"status":"Error","msg":"invalid top_n: )" + std::string(e.what()) + "\"}"; + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, msg); + return; + } + } + + // compact_type + compact_type = req->param("compact_type"); + if (!compact_type.empty() && compact_type != "base" && compact_type != "cumulative" && + compact_type != "full") { + HttpChannel::send_reply( + req, HttpStatus::BAD_REQUEST, + R"({"status":"Error","msg":"compact_type must be one of: base, cumulative, full"})"); + return; + } + + // success + const auto& success_str = req->param("success"); + if (!success_str.empty()) { + if (success_str == "true") { + success_filter = 1; + } else if (success_str == "false") { + success_filter = 0; + } else { + HttpChannel::send_reply( + req, HttpStatus::BAD_REQUEST, + R"({"status":"Error","msg":"success must be 'true' or 'false'"})"); + return; + } + } + + // Get completed tasks from tracker + auto tasks = CompactionTaskTracker::instance()->get_completed_tasks( + tablet_id, top_n, compact_type, success_filter); + + // Build JSON response + rapidjson::Document root; + root.SetObject(); + auto& allocator = root.GetAllocator(); + + root.AddMember("status", "Success", allocator); + + rapidjson::Value profiles(rapidjson::kArrayType); + + for (const auto& task : tasks) { + rapidjson::Value profile(rapidjson::kObjectType); + + profile.AddMember("compaction_id", task.compaction_id, allocator); + + { + rapidjson::Value v; + v.SetString(to_string(task.compaction_type), allocator); + profile.AddMember("compaction_type", v, allocator); + } + + profile.AddMember("tablet_id", task.tablet_id, allocator); + profile.AddMember("table_id", task.table_id, allocator); + profile.AddMember("partition_id", task.partition_id, allocator); + + { + rapidjson::Value v; + v.SetString(to_string(task.trigger_method), allocator); + profile.AddMember("trigger_method", v, allocator); + } + + profile.AddMember("compaction_score", task.compaction_score, allocator); + + // Datetime fields + { + auto s = format_timestamp_ms(task.scheduled_time_ms); + rapidjson::Value v; + v.SetString(s.c_str(), static_cast(s.size()), allocator); + profile.AddMember("scheduled_time", v, allocator); + } + { + auto s = format_timestamp_ms(task.start_time_ms); + rapidjson::Value v; + v.SetString(s.c_str(), static_cast(s.size()), allocator); + profile.AddMember("start_time", v, allocator); + } + { + auto s = format_timestamp_ms(task.end_time_ms); + rapidjson::Value v; + v.SetString(s.c_str(), static_cast(s.size()), allocator); + profile.AddMember("end_time", v, allocator); + } + + // Derived: cost_time_ms = end_time_ms - start_time_ms + int64_t cost_time_ms = 0; + if (task.start_time_ms > 0 && task.end_time_ms > 0) { + cost_time_ms = task.end_time_ms - task.start_time_ms; + } + profile.AddMember("cost_time_ms", cost_time_ms, allocator); + + // Derived: success = (status == FINISHED) + bool success = (task.status == CompactionTaskStatus::FINISHED); + profile.AddMember("success", success, allocator); + + // Input statistics + profile.AddMember("input_rowsets_count", task.input_rowsets_count, allocator); + profile.AddMember("input_row_num", task.input_row_num, allocator); + profile.AddMember("input_data_size", task.input_data_size, allocator); + profile.AddMember("input_index_size", task.input_index_size, allocator); + profile.AddMember("input_total_size", task.input_total_size, allocator); + profile.AddMember("input_segments_num", task.input_segments_num, allocator); + + { + rapidjson::Value v; + v.SetString(task.input_version_range.c_str(), + static_cast(task.input_version_range.size()), + allocator); + profile.AddMember("input_version_range", v, allocator); + } + + // Output statistics + profile.AddMember("merged_rows", task.merged_rows, allocator); + profile.AddMember("filtered_rows", task.filtered_rows, allocator); + profile.AddMember("output_rows", task.output_rows, allocator); + profile.AddMember("output_row_num", task.output_row_num, allocator); + profile.AddMember("output_data_size", task.output_data_size, allocator); + profile.AddMember("output_index_size", task.output_index_size, allocator); + profile.AddMember("output_total_size", task.output_total_size, allocator); + profile.AddMember("output_segments_num", task.output_segments_num, allocator); + + { + rapidjson::Value v; + v.SetString(task.output_version.c_str(), + static_cast(task.output_version.size()), allocator); + profile.AddMember("output_version", v, allocator); + } + + // Merge performance + profile.AddMember("merge_latency_ms", task.merge_latency_ms, allocator); + + // IO statistics + profile.AddMember("bytes_read_from_local", task.bytes_read_from_local, allocator); + profile.AddMember("bytes_read_from_remote", task.bytes_read_from_remote, allocator); + + // Resources + profile.AddMember("peak_memory_bytes", task.peak_memory_bytes, allocator); + profile.AddMember("is_vertical", task.is_vertical, allocator); + profile.AddMember("permits", task.permits, allocator); + + // Vertical compaction progress + profile.AddMember("vertical_total_groups", task.vertical_total_groups, allocator); + profile.AddMember("vertical_completed_groups", task.vertical_completed_groups, allocator); + + // Status message (only for failed tasks) + if (!task.status_msg.empty()) { + rapidjson::Value v; + v.SetString(task.status_msg.c_str(), + static_cast(task.status_msg.size()), allocator); + profile.AddMember("status_msg", v, allocator); + } + + profiles.PushBack(profile, allocator); + } + + root.AddMember("compaction_profiles", profiles, allocator); + + rapidjson::StringBuffer str_buf; + rapidjson::PrettyWriter writer(str_buf); + root.Accept(writer); + + HttpChannel::send_reply(req, HttpStatus::OK, str_buf.GetString()); +} + +#include "common/compile_check_end.h" +} // namespace doris diff --git a/be/src/service/http/action/compaction_profile_action.h b/be/src/service/http/action/compaction_profile_action.h new file mode 100644 index 00000000000000..bc0a2ccb217588 --- /dev/null +++ b/be/src/service/http/action/compaction_profile_action.h @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "service/http/http_handler_with_auth.h" + +namespace doris { + +class ExecEnv; +class HttpRequest; + +class CompactionProfileAction : public HttpHandlerWithAuth { +public: + CompactionProfileAction(ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type ptype); + ~CompactionProfileAction() override = default; + + void handle(HttpRequest* req) override; +}; + +} // namespace doris diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index f97c5ebde5ac09..a54be39a230011 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -43,6 +43,7 @@ #include "service/http/action/checksum_action.h" #include "service/http/action/clear_cache_action.h" #include "service/http/action/compaction_action.h" +#include "service/http/action/compaction_profile_action.h" #include "service/http/action/compaction_score_action.h" #include "service/http/action/config_action.h" #include "service/http/action/debug_point_action.h" @@ -408,6 +409,11 @@ void HttpService::register_local_handler(StorageEngine& engine) { _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/run_status", run_status_compaction_action); + CompactionProfileAction* compaction_profile_action = _pool.add( + new CompactionProfileAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); + _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/profile", + compaction_profile_action); + DeleteBitmapAction* count_delete_bitmap_action = _pool.add(new DeleteBitmapAction(DeleteBitmapActionType::COUNT_LOCAL, _env, engine, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); @@ -471,6 +477,12 @@ void HttpService::register_cloud_handler(CloudStorageEngine& engine) { TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/run_status", run_status_compaction_action); + + CompactionProfileAction* compaction_profile_action = _pool.add( + new CompactionProfileAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); + _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/profile", + compaction_profile_action); + DeleteBitmapAction* count_local_delete_bitmap_action = _pool.add(new DeleteBitmapAction(DeleteBitmapActionType::COUNT_LOCAL, _env, engine, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); diff --git a/be/src/storage/compaction/base_compaction.h b/be/src/storage/compaction/base_compaction.h index 453583f8227abf..a450e845aef3fa 100644 --- a/be/src/storage/compaction/base_compaction.h +++ b/be/src/storage/compaction/base_compaction.h @@ -17,12 +17,14 @@ #pragma once +#include #include #include #include "common/status.h" #include "io/io_common.h" #include "storage/compaction/compaction.h" +#include "storage/compaction_task_tracker.h" namespace doris { @@ -40,6 +42,10 @@ class BaseCompaction final : public CompactionMixin { Status execute_compact() override; + std::optional profile_type() const override { + return CompactionProfileType::BASE; + } + private: Status pick_rowsets_to_compact(); std::string_view compaction_name() const override { return "base compaction"; } diff --git a/be/src/storage/compaction/compaction.cpp b/be/src/storage/compaction/compaction.cpp index 698c81f7849fd0..24f654d5872321 100644 --- a/be/src/storage/compaction/compaction.cpp +++ b/be/src/storage/compaction/compaction.cpp @@ -56,6 +56,7 @@ #include "storage/compaction/cumulative_compaction.h" #include "storage/compaction/cumulative_compaction_policy.h" #include "storage/compaction/cumulative_compaction_time_series_policy.h" +#include "storage/compaction_task_tracker.h" #include "storage/data_dir.h" #include "storage/index/index_file_reader.h" #include "storage/index/index_file_writer.h" @@ -154,7 +155,8 @@ bool is_rowset_tidy(std::string& pre_max_key, bool& pre_rs_key_bounds_truncated, } // namespace Compaction::Compaction(BaseTabletSPtr tablet, const std::string& label) - : _mem_tracker( + : _compaction_id(CompactionTaskTracker::instance()->next_compaction_id()), + _mem_tracker( MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::COMPACTION, label)), _tablet(std::move(tablet)), _is_vertical(config::enable_vertical_compaction), @@ -177,6 +179,47 @@ Compaction::~Compaction() { _rowid_conversion.reset(); } +std::string Compaction::input_version_range_str() const { + if (_input_rowsets.empty()) return ""; + return fmt::format("[{}-{}]", _input_rowsets.front()->start_version(), + _input_rowsets.back()->end_version()); +} + +void Compaction::submit_profile_record(bool success, int64_t start_time_ms, + const std::string& status_msg) { + if (!profile_type().has_value()) { + return; + } + auto* tracker = CompactionTaskTracker::instance(); + CompletionStats stats; + stats.input_version_range = input_version_range_str(); + stats.end_time_ms = UnixMillis(); + stats.merged_rows = _stats.merged_rows; + stats.filtered_rows = _stats.filtered_rows; + stats.output_rows = _stats.output_rows; + if (_output_rowset) { + stats.output_row_num = _output_rowset->num_rows(); + stats.output_data_size = _output_rowset->data_disk_size(); + stats.output_index_size = _output_rowset->index_disk_size(); + stats.output_total_size = _output_rowset->total_disk_size(); + stats.output_segments_num = _output_rowset->num_segments(); + } + stats.output_version = _output_version.to_string(); + if (_merge_rowsets_latency_timer) { + stats.merge_latency_ms = _merge_rowsets_latency_timer->value() / 1000000; + } + stats.bytes_read_from_local = _stats.bytes_read_from_local; + stats.bytes_read_from_remote = _stats.bytes_read_from_remote; + if (_mem_tracker) { + stats.peak_memory_bytes = _mem_tracker->peak_consumption(); + } + if (success) { + tracker->complete(_compaction_id, stats); + } else { + tracker->fail(_compaction_id, stats, status_msg); + } +} + void Compaction::init_profile(const std::string& label) { _profile = std::make_unique(label); @@ -236,10 +279,14 @@ Status Compaction::merge_input_rowsets() { if (!_tablet->tablet_schema()->cluster_key_uids().empty()) { RETURN_IF_ERROR(update_delete_bitmap()); } + auto progress_cb = [compaction_id = this->_compaction_id](int64_t total, + int64_t completed) { + CompactionTaskTracker::instance()->update_progress(compaction_id, total, completed); + }; res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema, input_rs_readers, _output_rs_writer.get(), cast_set(get_avg_segment_rows()), - way_num, &_stats); + way_num, &_stats, progress_cb); } else { if (!_tablet->tablet_schema()->cluster_key_uids().empty()) { return Status::InternalError( @@ -533,13 +580,18 @@ bool CompactionMixin::handle_ordered_data_compaction() { } Status CompactionMixin::execute_compact() { + int64_t profile_start_time_ms = UnixMillis(); uint32_t checksum_before; uint32_t checksum_after; bool enable_compaction_checksum = config::enable_compaction_checksum; if (enable_compaction_checksum) { EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), _input_rowsets.back()->end_version(), &checksum_before); - RETURN_IF_ERROR(checksum_task.execute()); + auto st = checksum_task.execute(); + if (!st.ok()) { + submit_profile_record(false, profile_start_time_ms, st.to_string()); + return st; + } } auto* data_dir = tablet()->data_dir(); @@ -559,11 +611,17 @@ Status CompactionMixin::execute_compact() { if (enable_compaction_checksum) { EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), _input_rowsets.back()->end_version(), &checksum_after); - RETURN_IF_ERROR(checksum_task.execute()); + auto st = checksum_task.execute(); + if (!st.ok()) { + submit_profile_record(false, profile_start_time_ms, st.to_string()); + return st; + } if (checksum_before != checksum_after) { - return Status::InternalError( + auto mismatch_st = Status::InternalError( "compaction tablet checksum not consistent, before={}, after={}, tablet_id={}", checksum_before, checksum_after, _tablet->tablet_id()); + submit_profile_record(false, profile_start_time_ms, mismatch_st.to_string()); + return mismatch_st; } } @@ -579,6 +637,7 @@ Status CompactionMixin::execute_compact() { _output_rowset->total_disk_size()); _load_segment_to_cache(); + submit_profile_record(true, profile_start_time_ms); return Status::OK(); } @@ -1671,6 +1730,7 @@ size_t CloudCompactionMixin::apply_txn_size_truncation_and_log(const std::string } Status CloudCompactionMixin::execute_compact() { + int64_t profile_start_time_ms = UnixMillis(); TEST_INJECTION_POINT("Compaction::do_compaction"); int64_t permits = get_compaction_permits(); HANDLE_EXCEPTION_IF_CATCH_EXCEPTION( @@ -1686,6 +1746,7 @@ Status CloudCompactionMixin::execute_compact() { _tablet->table_id(), COMPACTION_DELETE_BITMAP_LOCK_ID, initiator(), _tablet->tablet_id()); } + submit_profile_record(false, profile_start_time_ms, ex.what()); }); DorisMetrics::instance()->remote_compaction_read_rows_total->increment(_input_row_num); @@ -1695,6 +1756,7 @@ Status CloudCompactionMixin::execute_compact() { _output_rowset->total_disk_size()); _load_segment_to_cache(); + submit_profile_record(true, profile_start_time_ms); return Status::OK(); } diff --git a/be/src/storage/compaction/compaction.h b/be/src/storage/compaction/compaction.h index 772c8b30aa4172..a8227884028104 100644 --- a/be/src/storage/compaction/compaction.h +++ b/be/src/storage/compaction/compaction.h @@ -28,6 +28,7 @@ #if defined(__clang__) #pragma clang diagnostic pop #endif +#include #include #include @@ -35,6 +36,7 @@ #include "common/status.h" #include "io/io_common.h" #include "runtime/runtime_profile.h" +#include "storage/compaction_task_tracker.h" #include "storage/merger.h" #include "storage/olap_common.h" #include "storage/rowid_conversion.h" @@ -75,6 +77,21 @@ class Compaction { virtual ReaderType compaction_type() const = 0; virtual std::string_view compaction_name() const = 0; + // Returns compaction profile type for task tracking. + // Default returns std::nullopt (not tracked). Only base/cumulative/full override. + virtual std::optional profile_type() const { return std::nullopt; } + + // Accessors for tracker registration + int64_t compaction_id() const { return _compaction_id; } + int64_t input_rowsets_data_size() const { return _input_rowsets_data_size; } + int64_t input_rowsets_index_size() const { return _input_rowsets_index_size; } + int64_t input_rowsets_total_size() const { return _input_rowsets_total_size; } + int64_t input_row_num_value() const { return _input_row_num; } + int64_t input_rowsets_count() const { return static_cast(_input_rowsets.size()); } + int64_t input_segments_num_value() const { return _input_num_segments; } + bool is_vertical() const { return _is_vertical; } + std::string input_version_range_str() const; + // the difference between index change compmaction and other compaction. // 1. delete predicate should be kept when input is cumu rowset. // 2. inverted compaction should be skipped. @@ -110,6 +127,11 @@ class Compaction { virtual Status update_delete_bitmap() = 0; + int64_t _compaction_id {0}; + + void submit_profile_record(bool success, int64_t start_time_ms, + const std::string& status_msg = ""); + // the root tracker for this compaction std::shared_ptr _mem_tracker; diff --git a/be/src/storage/compaction/cumulative_compaction.h b/be/src/storage/compaction/cumulative_compaction.h index 9e5bbbfcfb5241..a0dae39d589262 100644 --- a/be/src/storage/compaction/cumulative_compaction.h +++ b/be/src/storage/compaction/cumulative_compaction.h @@ -17,12 +17,14 @@ #pragma once +#include #include #include #include "common/status.h" #include "io/io_common.h" #include "storage/compaction/compaction.h" +#include "storage/compaction_task_tracker.h" #include "storage/olap_common.h" namespace doris { @@ -37,6 +39,10 @@ class CumulativeCompaction final : public CompactionMixin { Status execute_compact() override; + std::optional profile_type() const override { + return CompactionProfileType::CUMULATIVE; + } + private: std::string_view compaction_name() const override { return "cumulative compaction"; } diff --git a/be/src/storage/compaction/full_compaction.h b/be/src/storage/compaction/full_compaction.h index fb80613f722db6..6f180d4165157e 100644 --- a/be/src/storage/compaction/full_compaction.h +++ b/be/src/storage/compaction/full_compaction.h @@ -19,12 +19,14 @@ #include +#include #include #include #include "common/status.h" #include "io/io_common.h" #include "storage/compaction/compaction.h" +#include "storage/compaction_task_tracker.h" namespace doris { @@ -38,6 +40,10 @@ class FullCompaction final : public CompactionMixin { Status execute_compact() override; + std::optional profile_type() const override { + return CompactionProfileType::FULL; + } + private: Status pick_rowsets_to_compact(); diff --git a/be/src/storage/compaction_task_tracker.cpp b/be/src/storage/compaction_task_tracker.cpp new file mode 100644 index 00000000000000..d2275d21cad332 --- /dev/null +++ b/be/src/storage/compaction_task_tracker.cpp @@ -0,0 +1,251 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "storage/compaction_task_tracker.h" + +#include "common/config.h" +#include "common/logging.h" + +namespace doris { + +const char* to_string(CompactionProfileType type) { + switch (type) { + case CompactionProfileType::BASE: + return "base"; + case CompactionProfileType::CUMULATIVE: + return "cumulative"; + case CompactionProfileType::FULL: + return "full"; + } + return "unknown"; +} + +const char* to_string(CompactionTaskStatus status) { + switch (status) { + case CompactionTaskStatus::PENDING: + return "PENDING"; + case CompactionTaskStatus::RUNNING: + return "RUNNING"; + case CompactionTaskStatus::FINISHED: + return "FINISHED"; + case CompactionTaskStatus::FAILED: + return "FAILED"; + } + return "UNKNOWN"; +} + +const char* to_string(TriggerMethod method) { + switch (method) { + case TriggerMethod::AUTO: + return "AUTO"; + case TriggerMethod::MANUAL: + return "MANUAL"; + case TriggerMethod::LOAD_TRIGGERED: + return "LOAD_TRIGGERED"; + } + return "UNKNOWN"; +} + +CompactionTaskTracker* CompactionTaskTracker::instance() { + static CompactionTaskTracker s_instance; + return &s_instance; +} + +void CompactionTaskTracker::register_task(CompactionTaskInfo info) { + if (!config::enable_compaction_task_tracker) { + return; + } + std::unique_lock wlock(_mutex); + _active_tasks[info.compaction_id] = std::move(info); +} + +void CompactionTaskTracker::update_to_running(int64_t compaction_id, const RunningStats& stats) { + if (!config::enable_compaction_task_tracker) { + return; + } + std::unique_lock wlock(_mutex); + auto it = _active_tasks.find(compaction_id); + if (it != _active_tasks.end()) { + auto& task = it->second; + task.status = CompactionTaskStatus::RUNNING; + task.start_time_ms = stats.start_time_ms; + task.is_vertical = stats.is_vertical; + task.permits = stats.permits; + } +} + +void CompactionTaskTracker::update_progress(int64_t compaction_id, int64_t total_groups, + int64_t completed_groups) { + if (!config::enable_compaction_task_tracker) { + return; + } + std::unique_lock wlock(_mutex); + auto it = _active_tasks.find(compaction_id); + if (it != _active_tasks.end()) { + auto& task = it->second; + task.vertical_total_groups = total_groups; + task.vertical_completed_groups = completed_groups; + } +} + +void CompactionTaskTracker::complete(int64_t compaction_id, const CompletionStats& stats) { + if (!config::enable_compaction_task_tracker) { + return; + } + std::unique_lock wlock(_mutex); + auto it = _active_tasks.find(compaction_id); + if (it == _active_tasks.end()) { + LOG(WARNING) << "compaction_id " << compaction_id << " not found in active_tasks, skip"; + return; + } + + // Extract the task from active map. + auto node = _active_tasks.extract(it); + CompactionTaskInfo& info = node.mapped(); + info.status = CompactionTaskStatus::FINISHED; + _apply_completion(info, stats); + + if (config::compaction_task_tracker_max_records > 0) { + _completed_tasks.push_back(std::move(info)); + _trim_completed_locked(); + } +} + +void CompactionTaskTracker::fail(int64_t compaction_id, const CompletionStats& stats, + const std::string& msg) { + if (!config::enable_compaction_task_tracker) { + return; + } + std::unique_lock wlock(_mutex); + auto it = _active_tasks.find(compaction_id); + if (it == _active_tasks.end()) { + LOG(WARNING) << "compaction_id " << compaction_id << " not found in active_tasks, skip"; + return; + } + + // Extract the task from active map. + auto node = _active_tasks.extract(it); + CompactionTaskInfo& info = node.mapped(); + info.status = CompactionTaskStatus::FAILED; + info.status_msg = msg; + _apply_completion(info, stats); + + if (config::compaction_task_tracker_max_records > 0) { + _completed_tasks.push_back(std::move(info)); + _trim_completed_locked(); + } +} + +void CompactionTaskTracker::remove_task(int64_t compaction_id) { + if (!config::enable_compaction_task_tracker) { + return; + } + std::unique_lock wlock(_mutex); + _active_tasks.erase(compaction_id); // idempotent: no-op if already removed +} + +void CompactionTaskTracker::_apply_completion(CompactionTaskInfo& info, + const CompletionStats& stats) { + info.end_time_ms = stats.end_time_ms; + info.merged_rows = stats.merged_rows; + info.filtered_rows = stats.filtered_rows; + info.output_rows = stats.output_rows; + info.output_row_num = stats.output_row_num; + info.output_data_size = stats.output_data_size; + info.output_index_size = stats.output_index_size; + info.output_total_size = stats.output_total_size; + info.output_segments_num = stats.output_segments_num; + info.output_version = stats.output_version; + info.merge_latency_ms = stats.merge_latency_ms; + info.bytes_read_from_local = stats.bytes_read_from_local; + info.bytes_read_from_remote = stats.bytes_read_from_remote; + info.peak_memory_bytes = stats.peak_memory_bytes; + // Backfill input_version_range if it was empty at register time. + if (info.input_version_range.empty() && !stats.input_version_range.empty()) { + info.input_version_range = stats.input_version_range; + } +} + +void CompactionTaskTracker::_trim_completed_locked() { + int32_t max = config::compaction_task_tracker_max_records; + if (max <= 0) { + _completed_tasks.clear(); + return; + } + while (static_cast(_completed_tasks.size()) > max) { + _completed_tasks.pop_front(); + } +} + +std::vector CompactionTaskTracker::get_all_tasks() const { + std::shared_lock rlock(_mutex); + std::vector result; + result.reserve(_active_tasks.size() + _completed_tasks.size()); + for (const auto& [id, info] : _active_tasks) { + result.push_back(info); + } + for (const auto& info : _completed_tasks) { + result.push_back(info); + } + return result; +} + +std::vector CompactionTaskTracker::get_completed_tasks( + int64_t tablet_id, int64_t top_n, const std::string& compaction_type, + int success_filter) const { + int32_t max = config::compaction_task_tracker_max_records; + if (max <= 0) { + return {}; + } + + std::shared_lock rlock(_mutex); + std::vector result; + int32_t count = 0; + // Iterate in reverse order (newest first). + for (auto it = _completed_tasks.rbegin(); it != _completed_tasks.rend(); ++it) { + if (count >= max) { + break; + } + count++; + const auto& record = *it; + if (tablet_id != 0 && record.tablet_id != tablet_id) { + continue; + } + if (!compaction_type.empty() && compaction_type != to_string(record.compaction_type)) { + continue; + } + if (success_filter == 1 && record.status != CompactionTaskStatus::FINISHED) { + continue; + } + if (success_filter == 0 && record.status != CompactionTaskStatus::FAILED) { + continue; + } + result.push_back(record); + if (top_n > 0 && static_cast(result.size()) >= top_n) { + break; + } + } + return result; +} + +void CompactionTaskTracker::clear_for_test() { + std::unique_lock wlock(_mutex); + _active_tasks.clear(); + _completed_tasks.clear(); +} + +} // namespace doris diff --git a/be/src/storage/compaction_task_tracker.h b/be/src/storage/compaction_task_tracker.h new file mode 100644 index 00000000000000..52b84883c6baf8 --- /dev/null +++ b/be/src/storage/compaction_task_tracker.h @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace doris { + +// Standardized compaction type enum covering base / cumulative / full. +enum class CompactionProfileType : uint8_t { + BASE = 0, + CUMULATIVE = 1, + FULL = 2, +}; + +const char* to_string(CompactionProfileType type); + +// Task lifecycle status. +enum class CompactionTaskStatus : uint8_t { + PENDING = 0, + RUNNING = 1, + FINISHED = 2, + FAILED = 3, +}; + +const char* to_string(CompactionTaskStatus status); + +// How the compaction was triggered. +enum class TriggerMethod : uint8_t { + AUTO = 0, + MANUAL = 1, + LOAD_TRIGGERED = 2, +}; + +const char* to_string(TriggerMethod method); + +// Incremental info when transitioning from PENDING to RUNNING. +struct RunningStats { + int64_t start_time_ms {0}; + bool is_vertical {false}; + int64_t permits {0}; +}; + +// Result info collected when a task completes or fails. +struct CompletionStats { + std::string input_version_range; // backfill: may not be available at register time + int64_t end_time_ms {0}; + int64_t merged_rows {0}; + int64_t filtered_rows {0}; + int64_t output_rows {0}; // _stats.output_rows (Merger statistics) + int64_t output_row_num {0}; + int64_t output_data_size {0}; + int64_t output_index_size {0}; // _output_rowset->index_disk_size() + int64_t output_total_size {0}; // _output_rowset->total_disk_size() + int64_t output_segments_num {0}; + std::string output_version; + int64_t merge_latency_ms {0}; // _merge_rowsets_latency_timer (converted to ms) + int64_t bytes_read_from_local {0}; + int64_t bytes_read_from_remote {0}; + int64_t peak_memory_bytes {0}; +}; + +// Unified metadata describing a compaction task across its full lifecycle. +struct CompactionTaskInfo { + // ===== Identity ===== + int64_t compaction_id {0}; // unique task ID assigned by Tracker + int64_t backend_id {0}; // BE node ID + int64_t table_id {0}; // table ID + int64_t partition_id {0}; // partition ID + int64_t tablet_id {0}; // tablet ID + + // ===== Task attributes ===== + CompactionProfileType compaction_type {CompactionProfileType::BASE}; + CompactionTaskStatus status {CompactionTaskStatus::PENDING}; + TriggerMethod trigger_method {TriggerMethod::AUTO}; + int64_t compaction_score {0}; // tablet compaction score at register time + + // ===== Timestamps ===== + int64_t scheduled_time_ms {0}; // task registration time + int64_t start_time_ms {0}; // task execution start time (0 while PENDING) + int64_t end_time_ms {0}; // task end time (0 while not completed) + + // ===== Input statistics (available after prepare_compact) ===== + int64_t input_rowsets_count {0}; + int64_t input_row_num {0}; + int64_t input_data_size {0}; // bytes, corresponds to _input_rowsets_data_size + int64_t input_index_size {0}; // bytes, corresponds to _input_rowsets_index_size + int64_t input_total_size {0}; // bytes, = data + index + int64_t input_segments_num {0}; + std::string input_version_range; // e.g. "[0-5]" + + // ===== Output statistics (written at complete/fail) ===== + int64_t merged_rows {0}; + int64_t filtered_rows {0}; + int64_t output_rows {0}; // Merger output rows (_stats.output_rows; 0 for ordered path) + int64_t output_row_num {0}; // from _output_rowset->num_rows() + int64_t output_data_size {0}; // bytes, from _output_rowset->data_disk_size() + int64_t output_index_size {0}; // bytes, from _output_rowset->index_disk_size() + int64_t output_total_size {0}; // bytes, from _output_rowset->total_disk_size() + int64_t output_segments_num {0}; + std::string output_version; // e.g. "[0-5]" + + // ===== Merge performance ===== + int64_t merge_latency_ms {0}; // merge rowsets latency (ms; 0 for ordered path) + + // ===== IO statistics (written at complete/fail) ===== + int64_t bytes_read_from_local {0}; + int64_t bytes_read_from_remote {0}; + + // ===== Resources ===== + int64_t peak_memory_bytes {0}; // peak memory usage (bytes) + bool is_vertical {false}; // whether vertical merge is used + int64_t permits {0}; // compaction permits used + + // ===== Vertical compaction progress ===== + int64_t vertical_total_groups {0}; // total column groups (0 for horizontal) + int64_t vertical_completed_groups { + 0}; // completed column groups (updated in real-time during RUNNING) + + // ===== Error ===== + std::string status_msg; // failure message (empty on success) +}; + +// Global singleton managing compaction task lifecycle. +// Receives push reports from compaction entries and execution layer, +// provides pull query interfaces for system table and HTTP API. +class CompactionTaskTracker { +public: + static CompactionTaskTracker* instance(); + + // ID allocation: globally unique monotonically increasing, restarts from 1 after BE restart. + int64_t next_compaction_id() { return _next_id.fetch_add(1, std::memory_order_relaxed); } + + // ===== Push interfaces: lifecycle management (write lock) ===== + // All push interfaces are no-op when enable_compaction_task_tracker=false. + void register_task(CompactionTaskInfo info); + void update_to_running(int64_t compaction_id, const RunningStats& stats); + void update_progress(int64_t compaction_id, int64_t total_groups, int64_t completed_groups); + void complete(int64_t compaction_id, const CompletionStats& stats); + void fail(int64_t compaction_id, const CompletionStats& stats, const std::string& msg); + void remove_task(int64_t compaction_id); + + // ===== Pull interfaces: queries (read lock) ===== + // For system table: returns full snapshot copy of _active_tasks + _completed_tasks. + std::vector get_all_tasks() const; + + // For HTTP API: iterates _completed_tasks only, returns filtered subset copy. + std::vector get_completed_tasks(int64_t tablet_id = 0, int64_t top_n = 0, + const std::string& compaction_type = "", + int success_filter = -1) const; + + // Test only: clear all active and completed tasks. + void clear_for_test(); + +private: + CompactionTaskTracker() = default; + + void _apply_completion(CompactionTaskInfo& info, const CompletionStats& stats); + void _trim_completed_locked(); + + std::atomic _next_id {1}; + + mutable std::shared_mutex _mutex; + + // Active tasks (PENDING + RUNNING), indexed by compaction_id. + // Removed on complete/fail and moved to _completed_tasks. + std::unordered_map _active_tasks; + + // Completed tasks (FINISHED + FAILED), FIFO ring buffer. + // Oldest records are evicted when exceeding compaction_task_tracker_max_records. + std::deque _completed_tasks; +}; + +} // namespace doris diff --git a/be/src/storage/merger.cpp b/be/src/storage/merger.cpp index 364ddffe57959d..ec246f13c16d5d 100644 --- a/be/src/storage/merger.cpp +++ b/be/src/storage/merger.cpp @@ -474,7 +474,8 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t const std::vector& src_rowset_readers, RowsetWriter* dst_rowset_writer, uint32_t max_rows_per_segment, int64_t merge_way_num, - Statistics* stats_output) { + Statistics* stats_output, + VerticalCompactionProgressCallback progress_cb) { LOG(INFO) << "Start to do vertical compaction, tablet_id: " << tablet->tablet_id(); std::vector> column_groups; std::vector key_group_cluster_key_idxes; @@ -506,6 +507,10 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t vertical_split_columns(tablet_schema, &column_groups, &key_group_cluster_key_idxes, num_columns_per_group); + if (progress_cb) { + progress_cb(column_groups.size(), 0); + } + // Calculate total rows for density calculation after compaction int64_t total_rows = 0; for (const auto& rs_reader : src_rowset_readers) { @@ -576,6 +581,9 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t total_stats.rowid_conversion = group_stats.rowid_conversion; } } + if (progress_cb) { + progress_cb(column_groups.size(), i + 1); + } if (is_key) { RETURN_IF_ERROR(row_sources_buf.flush()); } diff --git a/be/src/storage/merger.h b/be/src/storage/merger.h index 62d9e4fed0e6b3..4dcf4787c2005b 100644 --- a/be/src/storage/merger.h +++ b/be/src/storage/merger.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include "common/status.h" @@ -38,6 +39,9 @@ class SegmentWriter; class RowSourcesBuffer; class VerticalBlockReader; +using VerticalCompactionProgressCallback = + std::function; + class Merger { public: struct Statistics { @@ -66,7 +70,7 @@ class Merger { BaseTabletSPtr tablet, ReaderType reader_type, const TabletSchema& tablet_schema, const std::vector& src_rowset_readers, RowsetWriter* dst_rowset_writer, uint32_t max_rows_per_segment, int64_t merge_way_num, - Statistics* stats_output); + Statistics* stats_output, VerticalCompactionProgressCallback progress_cb = nullptr); // for vertical compaction static void vertical_split_columns(const TabletSchema& tablet_schema, diff --git a/be/src/storage/olap_server.cpp b/be/src/storage/olap_server.cpp index 498cc8e6c3115d..72c6fa8440e498 100644 --- a/be/src/storage/olap_server.cpp +++ b/be/src/storage/olap_server.cpp @@ -64,6 +64,7 @@ #include "storage/compaction/cumulative_compaction_policy.h" #include "storage/compaction/cumulative_compaction_time_series_policy.h" #include "storage/compaction/single_replica_compaction.h" +#include "storage/compaction_task_tracker.h" #include "storage/data_dir.h" #include "storage/olap_common.h" #include "storage/olap_define.h" @@ -1053,7 +1054,8 @@ void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet } Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, - CompactionType compaction_type, bool force) { + CompactionType compaction_type, bool force, + int trigger_method) { if (tablet->tablet_meta()->tablet_schema()->enable_single_replica_compaction() && should_fetch_from_peer(tablet->tablet_id())) { VLOG_CRITICAL << "start to submit single replica compaction task for tablet: " @@ -1081,6 +1083,32 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, if (!force) { _permit_limiter.request(permits); } + // Register task with CompactionTaskTracker as PENDING + auto* tracker = CompactionTaskTracker::instance(); + int64_t compaction_id = compaction->compaction_id(); + { + CompactionTaskInfo info; + info.compaction_id = compaction_id; + info.tablet_id = tablet->tablet_id(); + info.table_id = tablet->get_table_id(); + info.partition_id = tablet->partition_id(); + info.compaction_type = (compaction_type == CompactionType::BASE_COMPACTION) + ? CompactionProfileType::BASE + : (compaction_type == CompactionType::CUMULATIVE_COMPACTION) + ? CompactionProfileType::CUMULATIVE + : CompactionProfileType::FULL; + info.status = CompactionTaskStatus::PENDING; + info.trigger_method = static_cast(trigger_method); + info.scheduled_time_ms = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + info.permits = permits; + info.input_rowsets_count = compaction->input_rowsets_count(); + info.input_row_num = compaction->input_row_num_value(); + info.input_data_size = compaction->input_rowsets_data_size(); + info.input_segments_num = compaction->input_segments_num_value(); + tracker->register_task(std::move(info)); + } std::unique_ptr& thread_pool = (compaction_type == CompactionType::CUMULATIVE_COMPACTION) ? _cumu_compaction_thread_pool @@ -1096,7 +1124,7 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, << ", num_total_queued_tasks: " << thread_pool->get_queue_size(); auto status = thread_pool->submit_func([=, compaction = std::move(compaction), this]() { _handle_compaction(std::move(tablet), std::move(compaction), compaction_type, permits, - force); + force, compaction_id); }); if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) [[likely]] { DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( @@ -1106,6 +1134,8 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, _base_compaction_thread_pool->get_queue_size()); } if (!st.ok()) { + // Cleanup tracker on submit failure + tracker->remove_task(compaction_id); if (!force) { _permit_limiter.release(permits); } @@ -1134,8 +1164,8 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, void StorageEngine::_handle_compaction(TabletSharedPtr tablet, std::shared_ptr compaction, - CompactionType compaction_type, int64_t permits, - bool force) { + CompactionType compaction_type, int64_t permits, bool force, + int64_t compaction_id) { if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) [[likely]] { DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1); DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( @@ -1148,6 +1178,8 @@ void StorageEngine::_handle_compaction(TabletSharedPtr tablet, bool is_large_task = true; Defer defer {[&]() { DBUG_EXECUTE_IF("StorageEngine._submit_compaction_task.sleep", { sleep(5); }) + // Idempotent cleanup: remove task from tracker + CompactionTaskTracker::instance()->remove_task(compaction_id); if (!force) { _permit_limiter.release(permits); } @@ -1219,12 +1251,21 @@ void StorageEngine::_handle_compaction(TabletSharedPtr tablet, return; } tablet->compaction_stage = CompactionStage::EXECUTING; + // Update tracker to RUNNING + { + RunningStats rs; + rs.start_time_ms = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + rs.permits = permits; + CompactionTaskTracker::instance()->update_to_running(compaction_id, rs); + } TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction"); tablet->execute_compaction(*compaction); } Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, - bool force, bool eager) { + bool force, bool eager, int trigger_method) { if (!eager) { DCHECK(compaction_type == CompactionType::BASE_COMPACTION || compaction_type == CompactionType::CUMULATIVE_COMPACTION); @@ -1254,7 +1295,7 @@ Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionT _cumulative_compaction_policies.at(tablet->tablet_meta()->compaction_policy())); } tablet->set_skip_compaction(false); - return _submit_compaction_task(tablet, compaction_type, force); + return _submit_compaction_task(tablet, compaction_type, force, trigger_method); } Status StorageEngine::_handle_seg_compaction(std::shared_ptr worker, diff --git a/be/src/storage/rowset_builder.cpp b/be/src/storage/rowset_builder.cpp index a5725eb8d92d94..a2008679a3e6c5 100644 --- a/be/src/storage/rowset_builder.cpp +++ b/be/src/storage/rowset_builder.cpp @@ -167,8 +167,8 @@ Status RowsetBuilder::check_tablet_version_count() { (version_count > max_version_config - 100) && !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) { // Trigger compaction - auto st = _engine.submit_compaction_task(tablet_sptr(), - CompactionType::CUMULATIVE_COMPACTION, true); + auto st = _engine.submit_compaction_task( + tablet_sptr(), CompactionType::CUMULATIVE_COMPACTION, true, true, 2); if (!st.ok()) [[unlikely]] { LOG(WARNING) << "failed to trigger compaction, tablet_id=" << _tablet->tablet_id() << " : " << st; diff --git a/be/src/storage/storage_engine.h b/be/src/storage/storage_engine.h index 8b50d1c4d9bf65..0f23cba67b0b3b 100644 --- a/be/src/storage/storage_engine.h +++ b/be/src/storage/storage_engine.h @@ -349,7 +349,7 @@ class StorageEngine final : public BaseStorageEngine { void get_compaction_status_json(std::string* result); Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, - bool force, bool eager = true); + bool force, bool eager = true, int trigger_method = 0); Status submit_seg_compaction_task(std::shared_ptr worker, SegCompactionCandidatesSharedPtr segments); @@ -445,10 +445,11 @@ class StorageEngine final : public BaseStorageEngine { CompactionType compaction_type); Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, - bool force); + bool force, int trigger_method = 0); void _handle_compaction(TabletSharedPtr tablet, std::shared_ptr compaction, - CompactionType compaction_type, int64_t permits, bool force); + CompactionType compaction_type, int64_t permits, bool force, + int64_t compaction_id = 0); Status _submit_single_replica_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type); diff --git a/be/test/storage/compaction_task_tracker_test.cpp b/be/test/storage/compaction_task_tracker_test.cpp new file mode 100644 index 00000000000000..743ef9dc945eed --- /dev/null +++ b/be/test/storage/compaction_task_tracker_test.cpp @@ -0,0 +1,769 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "storage/compaction_task_tracker.h" + +#include + +#include +#include +#include + +#include "common/config.h" + +namespace doris { + +class CompactionTaskTrackerTest : public testing::Test { +protected: + void SetUp() override { + _saved_enable = config::enable_compaction_task_tracker; + _saved_max_records = config::compaction_task_tracker_max_records; + config::enable_compaction_task_tracker = true; + config::compaction_task_tracker_max_records = 10000; + } + + void TearDown() override { + // Drain all tasks from the singleton so tests are independent. + _clear_tracker(); + config::enable_compaction_task_tracker = _saved_enable; + config::compaction_task_tracker_max_records = _saved_max_records; + } + + CompactionTaskTracker* tracker() { return CompactionTaskTracker::instance(); } + + // Helper: create a CompactionTaskInfo with sensible defaults for testing. + CompactionTaskInfo make_info(int64_t compaction_id, int64_t tablet_id = 1001, + CompactionProfileType type = CompactionProfileType::CUMULATIVE, + CompactionTaskStatus status = CompactionTaskStatus::PENDING, + TriggerMethod trigger = TriggerMethod::AUTO) { + CompactionTaskInfo info; + info.compaction_id = compaction_id; + info.backend_id = 10001; + info.table_id = 2001; + info.partition_id = 3001; + info.tablet_id = tablet_id; + info.compaction_type = type; + info.status = status; + info.trigger_method = trigger; + info.compaction_score = 42; + info.scheduled_time_ms = 1000000; + info.input_rowsets_count = 5; + info.input_row_num = 50000; + info.input_data_size = 10000000; + info.input_index_size = 200000; + info.input_total_size = 10200000; + info.input_segments_num = 5; + info.input_version_range = "[0-5]"; + return info; + } + + // Helper: create a CompletionStats with test data. + CompletionStats make_completion_stats() { + CompletionStats stats; + stats.input_version_range = "[0-5]"; + stats.end_time_ms = 2000000; + stats.merged_rows = 1000; + stats.filtered_rows = 50; + stats.output_rows = 48950; + stats.output_row_num = 48950; + stats.output_data_size = 5000000; + stats.output_index_size = 100000; + stats.output_total_size = 5100000; + stats.output_segments_num = 1; + stats.output_version = "[0-5]"; + stats.merge_latency_ms = 200; + stats.bytes_read_from_local = 10000000; + stats.bytes_read_from_remote = 0; + stats.peak_memory_bytes = 33554432; + return stats; + } + + // Helper: find a task by compaction_id in a vector. + const CompactionTaskInfo* find_task(const std::vector& tasks, + int64_t compaction_id) { + for (const auto& t : tasks) { + if (t.compaction_id == compaction_id) { + return &t; + } + } + return nullptr; + } + +private: + void _clear_tracker() { tracker()->clear_for_test(); } + + bool _saved_enable; + int32_t _saved_max_records; +}; + +// 1. Full lifecycle: PENDING -> RUNNING -> update_progress -> FINISHED +TEST_F(CompactionTaskTrackerTest, FullLifecycle_PendingToRunningToFinished) { + int64_t id = tracker()->next_compaction_id(); + auto info = make_info(id); + tracker()->register_task(info); + + // Verify PENDING state. + { + auto tasks = tracker()->get_all_tasks(); + const auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::PENDING); + EXPECT_EQ(task->tablet_id, 1001); + EXPECT_EQ(task->compaction_type, CompactionProfileType::CUMULATIVE); + EXPECT_EQ(task->trigger_method, TriggerMethod::AUTO); + EXPECT_EQ(task->compaction_score, 42); + EXPECT_EQ(task->scheduled_time_ms, 1000000); + EXPECT_EQ(task->start_time_ms, 0); + EXPECT_EQ(task->end_time_ms, 0); + EXPECT_EQ(task->input_rowsets_count, 5); + EXPECT_EQ(task->input_row_num, 50000); + EXPECT_EQ(task->input_data_size, 10000000); + EXPECT_EQ(task->is_vertical, false); + EXPECT_EQ(task->permits, 0); + } + + // Transition to RUNNING. + RunningStats rs; + rs.start_time_ms = 1500000; + rs.is_vertical = true; + rs.permits = 10200000; + tracker()->update_to_running(id, rs); + + { + auto tasks = tracker()->get_all_tasks(); + const auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::RUNNING); + EXPECT_EQ(task->start_time_ms, 1500000); + EXPECT_EQ(task->is_vertical, true); + EXPECT_EQ(task->permits, 10200000); + // Identity and input stats should be preserved. + EXPECT_EQ(task->tablet_id, 1001); + EXPECT_EQ(task->input_rowsets_count, 5); + // Output not yet available. + EXPECT_EQ(task->output_row_num, 0); + EXPECT_EQ(task->end_time_ms, 0); + } + + // Update progress. + tracker()->update_progress(id, 4, 2); + { + auto tasks = tracker()->get_all_tasks(); + const auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->vertical_total_groups, 4); + EXPECT_EQ(task->vertical_completed_groups, 2); + } + + // Complete. + auto stats = make_completion_stats(); + tracker()->complete(id, stats); + + { + auto tasks = tracker()->get_all_tasks(); + const auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::FINISHED); + EXPECT_EQ(task->end_time_ms, 2000000); + EXPECT_EQ(task->merged_rows, 1000); + EXPECT_EQ(task->filtered_rows, 50); + EXPECT_EQ(task->output_rows, 48950); + EXPECT_EQ(task->output_row_num, 48950); + EXPECT_EQ(task->output_data_size, 5000000); + EXPECT_EQ(task->output_index_size, 100000); + EXPECT_EQ(task->output_total_size, 5100000); + EXPECT_EQ(task->output_segments_num, 1); + EXPECT_EQ(task->output_version, "[0-5]"); + EXPECT_EQ(task->merge_latency_ms, 200); + EXPECT_EQ(task->bytes_read_from_local, 10000000); + EXPECT_EQ(task->bytes_read_from_remote, 0); + EXPECT_EQ(task->peak_memory_bytes, 33554432); + // Identity preserved through full lifecycle. + EXPECT_EQ(task->compaction_id, id); + EXPECT_EQ(task->tablet_id, 1001); + EXPECT_EQ(task->trigger_method, TriggerMethod::AUTO); + // Running stats preserved. + EXPECT_EQ(task->is_vertical, true); + EXPECT_EQ(task->permits, 10200000); + EXPECT_EQ(task->start_time_ms, 1500000); + } +} + +// 2. Direct running path (manual trigger, no PENDING stage). +TEST_F(CompactionTaskTrackerTest, DirectRunning_ManualTrigger) { + int64_t id = tracker()->next_compaction_id(); + auto info = make_info(id, 2002, CompactionProfileType::BASE, CompactionTaskStatus::RUNNING, + TriggerMethod::MANUAL); + info.start_time_ms = 1000000; + info.is_vertical = false; + info.permits = 5000000; + tracker()->register_task(info); + + // Verify directly RUNNING. + { + auto tasks = tracker()->get_all_tasks(); + const auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::RUNNING); + EXPECT_EQ(task->trigger_method, TriggerMethod::MANUAL); + EXPECT_EQ(task->compaction_type, CompactionProfileType::BASE); + EXPECT_EQ(task->start_time_ms, 1000000); + } + + // Complete. + tracker()->complete(id, make_completion_stats()); + + { + auto tasks = tracker()->get_all_tasks(); + const auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::FINISHED); + EXPECT_EQ(task->trigger_method, TriggerMethod::MANUAL); + } +} + +// 3. Fail path: status_msg is preserved. +TEST_F(CompactionTaskTrackerTest, FailPath_StatusMsgPreserved) { + int64_t id = tracker()->next_compaction_id(); + tracker()->register_task(make_info(id)); + + RunningStats rs; + rs.start_time_ms = 1500000; + rs.is_vertical = false; + rs.permits = 1000; + tracker()->update_to_running(id, rs); + + std::string error_msg = "checksum verification failed"; + auto stats = make_completion_stats(); + tracker()->fail(id, stats, error_msg); + + auto tasks = tracker()->get_all_tasks(); + const auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::FAILED); + EXPECT_EQ(task->status_msg, "checksum verification failed"); + // Output stats should still be filled even on failure. + EXPECT_EQ(task->output_row_num, 48950); + EXPECT_EQ(task->end_time_ms, 2000000); +} + +// 4. Prepare failure: register then remove_task cleans up. +TEST_F(CompactionTaskTrackerTest, PrepareFailure_RemoveTask) { + int64_t id = tracker()->next_compaction_id(); + tracker()->register_task(make_info(id)); + + // Verify task exists. + { + auto tasks = tracker()->get_all_tasks(); + EXPECT_NE(find_task(tasks, id), nullptr); + } + + // Remove (simulating prepare failure cleanup). + tracker()->remove_task(id); + + // Task should be gone. + { + auto tasks = tracker()->get_all_tasks(); + EXPECT_EQ(find_task(tasks, id), nullptr); + } +} + +// 5. Thread pool submit failure: register(PENDING) then remove_task. +TEST_F(CompactionTaskTrackerTest, ThreadPoolSubmitFailure_Cleanup) { + int64_t id = tracker()->next_compaction_id(); + auto info = make_info(id); + info.status = CompactionTaskStatus::PENDING; + tracker()->register_task(info); + + // Verify PENDING. + { + auto tasks = tracker()->get_all_tasks(); + const auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::PENDING); + } + + // Simulate thread pool submit failure -> cleanup. + tracker()->remove_task(id); + + // No dirty PENDING record should remain. + { + auto tasks = tracker()->get_all_tasks(); + EXPECT_EQ(find_task(tasks, id), nullptr); + } +} + +// 6. TRY_LOCK_FAILED via queue path: PENDING -> RUNNING -> remove_task. +TEST_F(CompactionTaskTrackerTest, TryLockFailed_QueuePath) { + int64_t id = tracker()->next_compaction_id(); + tracker()->register_task(make_info(id)); + + RunningStats rs; + rs.start_time_ms = 1500000; + rs.is_vertical = false; + rs.permits = 1000; + tracker()->update_to_running(id, rs); + + // Verify RUNNING. + { + auto tasks = tracker()->get_all_tasks(); + const auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::RUNNING); + } + + // execute_compact returns TRY_LOCK_FAILED, submit_profile_record not called. + // _handle_compaction does idempotent remove_task. + tracker()->remove_task(id); + + // Task should be cleaned up. + { + auto tasks = tracker()->get_all_tasks(); + EXPECT_EQ(find_task(tasks, id), nullptr); + } +} + +// 7. TRY_LOCK_FAILED via direct path: register(RUNNING) -> remove_task. +TEST_F(CompactionTaskTrackerTest, TryLockFailed_DirectPath) { + int64_t id = tracker()->next_compaction_id(); + auto info = make_info(id, 1001, CompactionProfileType::BASE, CompactionTaskStatus::RUNNING, + TriggerMethod::MANUAL); + info.start_time_ms = 1000000; + tracker()->register_task(info); + + // Verify RUNNING. + { + auto tasks = tracker()->get_all_tasks(); + const auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::RUNNING); + } + + // Simulate TRY_LOCK_FAILED: execute returns early, remove_task cleans up. + tracker()->remove_task(id); + + { + auto tasks = tracker()->get_all_tasks(); + EXPECT_EQ(find_task(tasks, id), nullptr); + } +} + +// 8. Missed register: complete with unknown ID should not crash and should not create a record. +TEST_F(CompactionTaskTrackerTest, MissedRegister_SilentSkip) { + int64_t unknown_id = tracker()->next_compaction_id(); + + // complete() with an unregistered ID: should be a silent no-op (WARNING log). + tracker()->complete(unknown_id, make_completion_stats()); + + // No record should exist for this ID. + auto tasks = tracker()->get_all_tasks(); + EXPECT_EQ(find_task(tasks, unknown_id), nullptr); + + // Also test fail() with unknown ID. + int64_t unknown_id2 = tracker()->next_compaction_id(); + tracker()->fail(unknown_id2, make_completion_stats(), "some error"); + tasks = tracker()->get_all_tasks(); + EXPECT_EQ(find_task(tasks, unknown_id2), nullptr); +} + +// 9. remove_task is idempotent: register -> complete -> remove_task should not affect completed. +TEST_F(CompactionTaskTrackerTest, RemoveTask_Idempotent) { + int64_t id = tracker()->next_compaction_id(); + tracker()->register_task(make_info(id)); + tracker()->complete(id, make_completion_stats()); + + // Verify task is FINISHED. + { + auto tasks = tracker()->get_all_tasks(); + const auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::FINISHED); + } + + // remove_task after complete: should be no-op since task was already + // extracted from _active_tasks by complete(). + tracker()->remove_task(id); + + // Completed record should still exist. + { + auto tasks = tracker()->get_all_tasks(); + const auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::FINISHED); + } +} + +// 10. Vertical compaction progress tracking. +TEST_F(CompactionTaskTrackerTest, VerticalProgress) { + int64_t id = tracker()->next_compaction_id(); + tracker()->register_task(make_info(id)); + + RunningStats rs; + rs.start_time_ms = 1500000; + rs.is_vertical = true; + rs.permits = 5000000; + tracker()->update_to_running(id, rs); + + // Initial progress: total_groups=4, completed=0. + tracker()->update_progress(id, 4, 0); + { + auto tasks = tracker()->get_all_tasks(); + const auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->vertical_total_groups, 4); + EXPECT_EQ(task->vertical_completed_groups, 0); + } + + // Progress: 2 of 4 groups done. + tracker()->update_progress(id, 4, 2); + { + auto tasks = tracker()->get_all_tasks(); + const auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->vertical_total_groups, 4); + EXPECT_EQ(task->vertical_completed_groups, 2); + } + + // All groups done. + tracker()->update_progress(id, 4, 4); + { + auto tasks = tracker()->get_all_tasks(); + const auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->vertical_total_groups, 4); + EXPECT_EQ(task->vertical_completed_groups, 4); + } + + // Complete. + tracker()->complete(id, make_completion_stats()); + { + auto tasks = tracker()->get_all_tasks(); + const auto* task = find_task(tasks, id); + ASSERT_NE(task, nullptr); + EXPECT_EQ(task->status, CompactionTaskStatus::FINISHED); + EXPECT_EQ(task->vertical_total_groups, 4); + EXPECT_EQ(task->vertical_completed_groups, 4); + } +} + +// 11. Trim completed records when exceeding max_records. +TEST_F(CompactionTaskTrackerTest, TrimCompleted) { + config::compaction_task_tracker_max_records = 5; + + // Add 10 completed tasks. + std::vector ids; + for (int i = 0; i < 10; i++) { + int64_t id = tracker()->next_compaction_id(); + ids.push_back(id); + auto info = make_info(id, 1001 + i); + tracker()->register_task(info); + tracker()->complete(id, make_completion_stats()); + } + + // Only the last 5 should remain in completed tasks. + auto completed = tracker()->get_completed_tasks(); + EXPECT_EQ(completed.size(), 5); + + // The oldest 5 should have been evicted; the newest 5 should remain. + for (int i = 0; i < 5; i++) { + EXPECT_EQ(find_task(completed, ids[i]), nullptr) + << "Task " << ids[i] << " (index " << i << ") should have been evicted"; + } + for (int i = 5; i < 10; i++) { + EXPECT_NE(find_task(completed, ids[i]), nullptr) + << "Task " << ids[i] << " (index " << i << ") should be present"; + } + + // get_all_tasks should also show exactly 5 (no active tasks remain). + auto all = tracker()->get_all_tasks(); + EXPECT_EQ(all.size(), 5); +} + +// 12. Disable switch: when disabled, push operations are no-ops, queries return empty. +TEST_F(CompactionTaskTrackerTest, DisableSwitch) { + config::enable_compaction_task_tracker = false; + + int64_t id = tracker()->next_compaction_id(); + tracker()->register_task(make_info(id)); + + // get_all_tasks should be empty because register was a no-op. + auto tasks = tracker()->get_all_tasks(); + EXPECT_TRUE(tasks.empty()); + + // Re-enable: registrations should work again. + config::enable_compaction_task_tracker = true; + + int64_t id2 = tracker()->next_compaction_id(); + tracker()->register_task(make_info(id2)); + + tasks = tracker()->get_all_tasks(); + EXPECT_EQ(tasks.size(), 1); + EXPECT_NE(find_task(tasks, id2), nullptr); + + // The first ID should still not be present (was registered while disabled). + EXPECT_EQ(find_task(tasks, id), nullptr); +} + +// 13. get_completed_tasks filter tests. +TEST_F(CompactionTaskTrackerTest, GetCompletedTasks_Filters) { + // Create a diverse set of completed tasks: + // T1: tablet=1001, CUMULATIVE, FINISHED + // T2: tablet=1001, BASE, FAILED + // T3: tablet=2002, CUMULATIVE, FINISHED + // T4: tablet=2002, FULL, FINISHED + // T5: tablet=1001, CUMULATIVE, FINISHED + + auto register_and_complete = [&](int64_t tablet_id, CompactionProfileType type, + bool success) -> int64_t { + int64_t id = tracker()->next_compaction_id(); + auto info = make_info(id, tablet_id, type); + tracker()->register_task(info); + if (success) { + tracker()->complete(id, make_completion_stats()); + } else { + tracker()->fail(id, make_completion_stats(), "test failure"); + } + return id; + }; + + int64_t t1 = register_and_complete(1001, CompactionProfileType::CUMULATIVE, true); + int64_t t2 = register_and_complete(1001, CompactionProfileType::BASE, false); + int64_t t3 = register_and_complete(2002, CompactionProfileType::CUMULATIVE, true); + int64_t t4 = register_and_complete(2002, CompactionProfileType::FULL, true); + int64_t t5 = register_and_complete(1001, CompactionProfileType::CUMULATIVE, true); + + // No filter: should return all 5. + { + auto result = tracker()->get_completed_tasks(); + EXPECT_EQ(result.size(), 5); + } + + // Filter by tablet_id=1001: should return 3 (t1, t2, t5). + { + auto result = tracker()->get_completed_tasks(1001); + EXPECT_EQ(result.size(), 3); + for (const auto& r : result) { + EXPECT_EQ(r.tablet_id, 1001); + } + } + + // Filter by tablet_id=2002: should return 2 (t3, t4). + { + auto result = tracker()->get_completed_tasks(2002); + EXPECT_EQ(result.size(), 2); + } + + // top_n=2: should return only 2 (newest first: t5, t4). + { + auto result = tracker()->get_completed_tasks(0, 2); + EXPECT_EQ(result.size(), 2); + // Newest first (reverse iteration): t5 then t4. + EXPECT_EQ(result[0].compaction_id, t5); + EXPECT_EQ(result[1].compaction_id, t4); + } + + // compact_type="cumulative": should return t1, t3, t5. + { + auto result = tracker()->get_completed_tasks(0, 0, "cumulative"); + EXPECT_EQ(result.size(), 3); + for (const auto& r : result) { + EXPECT_EQ(r.compaction_type, CompactionProfileType::CUMULATIVE); + } + } + + // compact_type="base": should return t2 only. + { + auto result = tracker()->get_completed_tasks(0, 0, "base"); + EXPECT_EQ(result.size(), 1); + EXPECT_EQ(result[0].compaction_id, t2); + } + + // compact_type="full": should return t4 only. + { + auto result = tracker()->get_completed_tasks(0, 0, "full"); + EXPECT_EQ(result.size(), 1); + EXPECT_EQ(result[0].compaction_id, t4); + } + + // success_filter=1 (success only): should return t1, t3, t4, t5 (not t2). + { + auto result = tracker()->get_completed_tasks(0, 0, "", 1); + EXPECT_EQ(result.size(), 4); + for (const auto& r : result) { + EXPECT_EQ(r.status, CompactionTaskStatus::FINISHED); + } + } + + // success_filter=0 (failed only): should return t2 only. + { + auto result = tracker()->get_completed_tasks(0, 0, "", 0); + EXPECT_EQ(result.size(), 1); + EXPECT_EQ(result[0].compaction_id, t2); + EXPECT_EQ(result[0].status, CompactionTaskStatus::FAILED); + } + + // Combined: tablet_id=1001 + compact_type="cumulative" + success_filter=1. + // Should return t1, t5. + { + auto result = tracker()->get_completed_tasks(1001, 0, "cumulative", 1); + EXPECT_EQ(result.size(), 2); + for (const auto& r : result) { + EXPECT_EQ(r.tablet_id, 1001); + EXPECT_EQ(r.compaction_type, CompactionProfileType::CUMULATIVE); + EXPECT_EQ(r.status, CompactionTaskStatus::FINISHED); + } + } + + // Combined with top_n: tablet_id=1001 + top_n=1. + { + auto result = tracker()->get_completed_tasks(1001, 1); + EXPECT_EQ(result.size(), 1); + EXPECT_EQ(result[0].compaction_id, t5); // newest matching + } + + // Suppress unused variable warnings. + (void)t1; + (void)t3; +} + +// 14. Concurrent safety: multiple threads doing register/update/complete/query simultaneously. +TEST_F(CompactionTaskTrackerTest, ConcurrentSafety) { + const int num_tasks_per_thread = 50; + const int num_threads = 8; + + auto worker = [&](int thread_idx) { + for (int i = 0; i < num_tasks_per_thread; i++) { + int64_t id = tracker()->next_compaction_id(); + auto info = make_info(id, 1000 + thread_idx); + tracker()->register_task(info); + + RunningStats rs; + rs.start_time_ms = 1500000; + rs.is_vertical = (i % 2 == 0); + rs.permits = 1000; + tracker()->update_to_running(id, rs); + + if (rs.is_vertical) { + tracker()->update_progress(id, 3, 1); + tracker()->update_progress(id, 3, 3); + } + + // Interleave queries. + auto tasks = tracker()->get_all_tasks(); + (void)tasks; + + if (i % 3 == 0) { + // Simulate failure. + tracker()->fail(id, make_completion_stats(), "test error"); + } else { + tracker()->complete(id, make_completion_stats()); + } + + // Idempotent remove_task after complete/fail. + tracker()->remove_task(id); + } + }; + + std::vector threads; + for (int t = 0; t < num_threads; t++) { + threads.emplace_back(worker, t); + } + for (auto& t : threads) { + t.join(); + } + + // All tasks should be completed. No active tasks should remain. + auto all = tracker()->get_all_tasks(); + int active_count = 0; + int completed_count = 0; + for (const auto& task : all) { + if (task.status == CompactionTaskStatus::PENDING || + task.status == CompactionTaskStatus::RUNNING) { + active_count++; + } else { + completed_count++; + } + } + EXPECT_EQ(active_count, 0); + EXPECT_EQ(completed_count, num_tasks_per_thread * num_threads); +} + +// 15. Trigger method distinction: AUTO, MANUAL, LOAD_TRIGGERED are all preserved. +TEST_F(CompactionTaskTrackerTest, TriggerMethodDistinction) { + int64_t id_auto = tracker()->next_compaction_id(); + auto info_auto = make_info(id_auto, 1001, CompactionProfileType::CUMULATIVE, + CompactionTaskStatus::PENDING, TriggerMethod::AUTO); + tracker()->register_task(info_auto); + + int64_t id_manual = tracker()->next_compaction_id(); + auto info_manual = make_info(id_manual, 1002, CompactionProfileType::BASE, + CompactionTaskStatus::RUNNING, TriggerMethod::MANUAL); + info_manual.start_time_ms = 1000000; + tracker()->register_task(info_manual); + + int64_t id_load = tracker()->next_compaction_id(); + auto info_load = make_info(id_load, 1003, CompactionProfileType::CUMULATIVE, + CompactionTaskStatus::PENDING, TriggerMethod::LOAD_TRIGGERED); + tracker()->register_task(info_load); + + // Verify each trigger method is preserved. + { + auto tasks = tracker()->get_all_tasks(); + const auto* t_auto = find_task(tasks, id_auto); + ASSERT_NE(t_auto, nullptr); + EXPECT_EQ(t_auto->trigger_method, TriggerMethod::AUTO); + + const auto* t_manual = find_task(tasks, id_manual); + ASSERT_NE(t_manual, nullptr); + EXPECT_EQ(t_manual->trigger_method, TriggerMethod::MANUAL); + + const auto* t_load = find_task(tasks, id_load); + ASSERT_NE(t_load, nullptr); + EXPECT_EQ(t_load->trigger_method, TriggerMethod::LOAD_TRIGGERED); + } + + // Complete all and verify trigger methods are still preserved in completed records. + RunningStats rs; + rs.start_time_ms = 1500000; + rs.is_vertical = false; + rs.permits = 1000; + tracker()->update_to_running(id_auto, rs); + tracker()->update_to_running(id_load, rs); + + tracker()->complete(id_auto, make_completion_stats()); + tracker()->complete(id_manual, make_completion_stats()); + tracker()->complete(id_load, make_completion_stats()); + + { + auto completed = tracker()->get_completed_tasks(); + EXPECT_EQ(completed.size(), 3); + + const auto* c_auto = find_task(completed, id_auto); + ASSERT_NE(c_auto, nullptr); + EXPECT_EQ(c_auto->trigger_method, TriggerMethod::AUTO); + + const auto* c_manual = find_task(completed, id_manual); + ASSERT_NE(c_manual, nullptr); + EXPECT_EQ(c_manual->trigger_method, TriggerMethod::MANUAL); + + const auto* c_load = find_task(completed, id_load); + ASSERT_NE(c_load, nullptr); + EXPECT_EQ(c_load->trigger_method, TriggerMethod::LOAD_TRIGGERED); + } +} + +} // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java index ad3d2655dcbdae..d571127a478387 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java @@ -116,7 +116,9 @@ public enum SchemaTableType { SCH_DATABASE_PROPERTIES("DATABASE_PROPERTIES", "DATABASE_PROPERTIES", TSchemaTableType.SCH_DATABASE_PROPERTIES), SCH_AUTHENTICATION_INTEGRATIONS("AUTHENTICATION_INTEGRATIONS", "AUTHENTICATION_INTEGRATIONS", - TSchemaTableType.SCH_AUTHENTICATION_INTEGRATIONS); + TSchemaTableType.SCH_AUTHENTICATION_INTEGRATIONS), + SCH_BE_COMPACTION_TASKS("BE_COMPACTION_TASKS", "BE_COMPACTION_TASKS", + TSchemaTableType.SCH_BE_COMPACTION_TASKS); private static final String dbName = "INFORMATION_SCHEMA"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 6f269113012918..6e14d88f9f538d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -846,6 +846,47 @@ public class SchemaTable extends Table { .column("ALTER_USER", ScalarType.createStringType()) .column("MODIFY_TIME", ScalarType.createStringType()) .build())) + .put("be_compaction_tasks", + new SchemaTable(SystemIdGenerator.getNextId(), "be_compaction_tasks", TableType.SCHEMA, + builder().column("BACKEND_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("COMPACTION_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("TABLE_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("PARTITION_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("TABLET_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("COMPACTION_TYPE", ScalarType.createVarchar(64)) + .column("STATUS", ScalarType.createVarchar(16)) + .column("TRIGGER_METHOD", ScalarType.createVarchar(16)) + .column("COMPACTION_SCORE", ScalarType.createType(PrimitiveType.BIGINT)) + .column("SCHEDULED_TIME", ScalarType.createType(PrimitiveType.DATETIME)) + .column("START_TIME", ScalarType.createType(PrimitiveType.DATETIME)) + .column("END_TIME", ScalarType.createType(PrimitiveType.DATETIME)) + .column("ELAPSED_TIME_MS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("INPUT_ROWSETS_COUNT", ScalarType.createType(PrimitiveType.BIGINT)) + .column("INPUT_ROW_NUM", ScalarType.createType(PrimitiveType.BIGINT)) + .column("INPUT_DATA_SIZE", ScalarType.createType(PrimitiveType.BIGINT)) + .column("INPUT_INDEX_SIZE", ScalarType.createType(PrimitiveType.BIGINT)) + .column("INPUT_TOTAL_SIZE", ScalarType.createType(PrimitiveType.BIGINT)) + .column("INPUT_SEGMENTS_NUM", ScalarType.createType(PrimitiveType.BIGINT)) + .column("INPUT_VERSION_RANGE", ScalarType.createVarchar(64)) + .column("MERGED_ROWS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("FILTERED_ROWS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("OUTPUT_ROWS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("OUTPUT_ROW_NUM", ScalarType.createType(PrimitiveType.BIGINT)) + .column("OUTPUT_DATA_SIZE", ScalarType.createType(PrimitiveType.BIGINT)) + .column("OUTPUT_INDEX_SIZE", ScalarType.createType(PrimitiveType.BIGINT)) + .column("OUTPUT_TOTAL_SIZE", ScalarType.createType(PrimitiveType.BIGINT)) + .column("OUTPUT_SEGMENTS_NUM", ScalarType.createType(PrimitiveType.BIGINT)) + .column("OUTPUT_VERSION", ScalarType.createVarchar(64)) + .column("MERGE_LATENCY_MS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("BYTES_READ_FROM_LOCAL", ScalarType.createType(PrimitiveType.BIGINT)) + .column("BYTES_READ_FROM_REMOTE", ScalarType.createType(PrimitiveType.BIGINT)) + .column("PEAK_MEMORY_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) + .column("IS_VERTICAL", ScalarType.createType(PrimitiveType.BOOLEAN)) + .column("PERMITS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("VERTICAL_TOTAL_GROUPS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("VERTICAL_COMPLETED_GROUPS", ScalarType.createType(PrimitiveType.BIGINT)) + .column("STATUS_MSG", ScalarType.createVarchar(1024)) + .build())) .build(); private boolean fetchAllFe = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java index 4338ecbabd92bc..9bd01bef5f8f72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java @@ -77,6 +77,7 @@ public class BackendPartitionedSchemaScanNode extends SchemaScanNode { BACKEND_TABLE.add("backend_configuration"); BACKEND_TABLE.add("column_data_sizes"); + BACKEND_TABLE.add("be_compaction_tasks"); } public static boolean isBackendPartitionedSchemaTable(String tableName) { diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 7d3883c9f9b1e0..8c1270ee8034fc 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -215,6 +215,7 @@ enum TSchemaTableType { SCH_FILE_CACHE_INFO = 65; SCH_DATABASE_PROPERTIES = 66; SCH_AUTHENTICATION_INTEGRATIONS = 67; + SCH_BE_COMPACTION_TASKS = 68; } enum THdfsCompression { diff --git a/regression-test/suites/compaction/test_be_compaction_tasks.groovy b/regression-test/suites/compaction/test_be_compaction_tasks.groovy new file mode 100644 index 00000000000000..0c1df3bd0b3fcb --- /dev/null +++ b/regression-test/suites/compaction/test_be_compaction_tasks.groovy @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_be_compaction_tasks", "p0") { + def tableName = "test_be_compaction_tasks_tbl" + + // Step 1: Setup - get backend info + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort) + + String backend_id = backendId_to_backendIP.keySet()[0] + + try { + // Step 2: Create table with disable_auto_compaction + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `id` INT NOT NULL, + `name` VARCHAR(128) NOT NULL, + `value` INT NOT NULL, + `ts` DATETIME NOT NULL + ) + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ) + """ + + // Step 3: Insert several batches of data to create multiple rowsets + for (int i = 0; i < 5; i++) { + sql """ INSERT INTO ${tableName} VALUES + (${i * 10 + 1}, 'name_${i}_1', ${i * 100 + 1}, '2025-01-01 00:00:00'), + (${i * 10 + 2}, 'name_${i}_2', ${i * 100 + 2}, '2025-01-01 00:00:01'), + (${i * 10 + 3}, 'name_${i}_3', ${i * 100 + 3}, '2025-01-01 00:00:02'), + (${i * 10 + 4}, 'name_${i}_4', ${i * 100 + 4}, '2025-01-01 00:00:03'), + (${i * 10 + 5}, 'name_${i}_5', ${i * 100 + 5}, '2025-01-01 00:00:04') + """ + } + + // Step 4: Basic query - verify system table returns valid results + def basicResult = sql """ SELECT * FROM information_schema.be_compaction_tasks """ + logger.info("Basic query result rows: " + basicResult.size()) + // The table should be queryable (may or may not have rows depending on BE state) + assertTrue(basicResult != null) + + // Step 5: DESC check - verify 38 columns + def descResult = sql """ DESC information_schema.be_compaction_tasks """ + logger.info("DESC result rows: " + descResult.size()) + assertEquals(38, descResult.size()) + + // Get tablet info for compaction trigger + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + assertTrue(tablets.size() > 0) + def tablet = tablets[0] + String tablet_id = tablet.TabletId + + // Step 6: Trigger cumulative compaction via HTTP API and wait for completion + trigger_and_wait_compaction(tableName, "cumulative") + + // Step 7: After compaction - query system table with WHERE STATUS = 'FINISHED' + def finishedResult = sql """ SELECT * FROM information_schema.be_compaction_tasks WHERE STATUS = 'FINISHED' """ + logger.info("Finished compaction tasks: " + finishedResult.size()) + assertTrue(finishedResult.size() > 0, "Expected at least one FINISHED compaction task") + + // Step 8: Field validation - verify key fields are non-null and reasonable + // Query specific fields for the completed compaction on our tablet + def fieldResult = sql_return_maparray """ + SELECT BACKEND_ID, COMPACTION_ID, TABLE_ID, PARTITION_ID, TABLET_ID, + COMPACTION_TYPE, STATUS, TRIGGER_METHOD, COMPACTION_SCORE, + SCHEDULED_TIME, START_TIME, END_TIME, ELAPSED_TIME_MS, + INPUT_ROWSETS_COUNT, INPUT_ROW_NUM, INPUT_DATA_SIZE, + INPUT_SEGMENTS_NUM, OUTPUT_ROW_NUM, OUTPUT_DATA_SIZE, + OUTPUT_SEGMENTS_NUM, IS_VERTICAL + FROM information_schema.be_compaction_tasks + WHERE TABLET_ID = ${tablet_id} AND STATUS = 'FINISHED' + ORDER BY COMPACTION_ID DESC + LIMIT 1 + """ + logger.info("Field validation result: " + fieldResult) + assertTrue(fieldResult.size() > 0, "Expected FINISHED record for tablet " + tablet_id) + + def record = fieldResult[0] + // Verify key fields are non-null + assertTrue(record.BACKEND_ID != null && Long.parseLong(record.BACKEND_ID.toString()) > 0, + "BACKEND_ID should be positive") + assertTrue(record.COMPACTION_ID != null && Long.parseLong(record.COMPACTION_ID.toString()) > 0, + "COMPACTION_ID should be positive") + assertTrue(record.TABLE_ID != null && Long.parseLong(record.TABLE_ID.toString()) > 0, + "TABLE_ID should be positive") + assertTrue(record.TABLET_ID != null && record.TABLET_ID.toString() == tablet_id, + "TABLET_ID should match") + assertTrue(record.COMPACTION_TYPE != null && record.COMPACTION_TYPE.toString() == "cumulative", + "COMPACTION_TYPE should be cumulative") + assertTrue(record.STATUS != null && record.STATUS.toString() == "FINISHED", + "STATUS should be FINISHED") + assertTrue(record.SCHEDULED_TIME != null, "SCHEDULED_TIME should not be null") + assertTrue(record.START_TIME != null, "START_TIME should not be null") + assertTrue(record.END_TIME != null, "END_TIME should not be null") + assertTrue(record.INPUT_DATA_SIZE != null && Long.parseLong(record.INPUT_DATA_SIZE.toString()) > 0, + "INPUT_DATA_SIZE should be positive") + assertTrue(record.INPUT_ROWSETS_COUNT != null && Long.parseLong(record.INPUT_ROWSETS_COUNT.toString()) > 0, + "INPUT_ROWSETS_COUNT should be positive") + assertTrue(record.INPUT_ROW_NUM != null && Long.parseLong(record.INPUT_ROW_NUM.toString()) > 0, + "INPUT_ROW_NUM should be positive") + assertTrue(record.OUTPUT_ROW_NUM != null && Long.parseLong(record.OUTPUT_ROW_NUM.toString()) >= 0, + "OUTPUT_ROW_NUM should be non-negative") + assertTrue(record.OUTPUT_DATA_SIZE != null && Long.parseLong(record.OUTPUT_DATA_SIZE.toString()) >= 0, + "OUTPUT_DATA_SIZE should be non-negative") + + // Step 9: TRIGGER_METHOD check - manual triggered should show 'MANUAL' + assertTrue(record.TRIGGER_METHOD != null && record.TRIGGER_METHOD.toString() == "MANUAL", + "TRIGGER_METHOD should be MANUAL for manually triggered compaction, got: " + record.TRIGGER_METHOD) + + // Step 10: Filter test - WHERE tablet_id = X AND status = 'FINISHED' + def filterResult = sql """ + SELECT COUNT(*) FROM information_schema.be_compaction_tasks + WHERE TABLET_ID = ${tablet_id} AND STATUS = 'FINISHED' + """ + logger.info("Filter result (tablet_id + status): " + filterResult) + assertTrue(filterResult[0][0] > 0, "Expected at least one record matching filter") + + // Step 11: Non-existent tablet - WHERE tablet_id = 999999999 returns empty + def emptyResult = sql """ + SELECT * FROM information_schema.be_compaction_tasks + WHERE TABLET_ID = 999999999 + """ + logger.info("Non-existent tablet result: " + emptyResult.size()) + assertEquals(0, emptyResult.size()) + + } finally { + // Step 12: Cleanup + try_sql("DROP TABLE IF EXISTS ${tableName} FORCE") + } +} diff --git a/regression-test/suites/compaction/test_compaction_profile_action.groovy b/regression-test/suites/compaction/test_compaction_profile_action.groovy new file mode 100644 index 00000000000000..7e505aa00a75fa --- /dev/null +++ b/regression-test/suites/compaction/test_compaction_profile_action.groovy @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_compaction_profile_action", "p0") { + def tableName = "test_compaction_profile_action_tbl" + + // Step 1: Setup - get backend info + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort) + + String backend_id = backendId_to_backendIP.keySet()[0] + def beHost = backendId_to_backendIP[backend_id] + def beHttpPort = backendId_to_backendHttpPort[backend_id] + def baseUrl = "http://${beHost}:${beHttpPort}/api/compaction/profile" + + try { + // Step 2: Create table, insert data, trigger compaction, wait for completion + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `id` INT NOT NULL, + `name` VARCHAR(128) NOT NULL, + `value` INT NOT NULL, + `ts` DATETIME NOT NULL + ) + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ) + """ + + // Insert several batches to create multiple rowsets + for (int i = 0; i < 5; i++) { + sql """ INSERT INTO ${tableName} VALUES + (${i * 10 + 1}, 'name_${i}_1', ${i * 100 + 1}, '2025-01-01 00:00:00'), + (${i * 10 + 2}, 'name_${i}_2', ${i * 100 + 2}, '2025-01-01 00:00:01'), + (${i * 10 + 3}, 'name_${i}_3', ${i * 100 + 3}, '2025-01-01 00:00:02'), + (${i * 10 + 4}, 'name_${i}_4', ${i * 100 + 4}, '2025-01-01 00:00:03'), + (${i * 10 + 5}, 'name_${i}_5', ${i * 100 + 5}, '2025-01-01 00:00:04') + """ + } + + // Get tablet info + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + assertTrue(tablets.size() > 0) + def tablet = tablets[0] + String tablet_id = tablet.TabletId + + // Trigger compaction and wait for completion + trigger_and_wait_compaction(tableName, "cumulative") + + // Step 3: Basic query - GET /api/compaction/profile - verify JSON response format + def (code, out, err) = curl("GET", baseUrl) + logger.info("Basic profile query: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(0, code) + def json = parseJson(out.trim()) + assertEquals("Success", json.status) + + // Step 4: Verify response contains compaction_profiles array + assertTrue(json.compaction_profiles != null, "Response should contain compaction_profiles") + assertTrue(json.compaction_profiles instanceof List, "compaction_profiles should be a list") + assertTrue(json.compaction_profiles.size() > 0, "compaction_profiles should not be empty after compaction") + + // Step 5: Verify a profile record has expected fields + def profile = json.compaction_profiles[0] + assertTrue(profile.compaction_id != null, "compaction_id should be present") + assertTrue(profile.compaction_type != null, "compaction_type should be present") + assertTrue(profile.tablet_id != null, "tablet_id should be present") + assertTrue(profile.table_id != null, "table_id should be present") + assertTrue(profile.partition_id != null, "partition_id should be present") + assertTrue(profile.trigger_method != null, "trigger_method should be present") + assertTrue(profile.compaction_score != null, "compaction_score should be present") + assertTrue(profile.scheduled_time != null, "scheduled_time should be present") + assertTrue(profile.start_time != null, "start_time should be present") + assertTrue(profile.end_time != null, "end_time should be present") + assertTrue(profile.cost_time_ms != null, "cost_time_ms should be present") + assertTrue(profile.containsKey("success"), "success should be present") + assertTrue(profile.input_rowsets_count != null, "input_rowsets_count should be present") + assertTrue(profile.input_row_num != null, "input_row_num should be present") + assertTrue(profile.input_data_size != null, "input_data_size should be present") + assertTrue(profile.input_index_size != null, "input_index_size should be present") + assertTrue(profile.input_total_size != null, "input_total_size should be present") + assertTrue(profile.input_segments_num != null, "input_segments_num should be present") + assertTrue(profile.input_version_range != null, "input_version_range should be present") + assertTrue(profile.containsKey("merged_rows"), "merged_rows should be present") + assertTrue(profile.containsKey("filtered_rows"), "filtered_rows should be present") + assertTrue(profile.containsKey("output_rows"), "output_rows should be present") + assertTrue(profile.output_row_num != null, "output_row_num should be present") + assertTrue(profile.output_data_size != null, "output_data_size should be present") + assertTrue(profile.output_index_size != null, "output_index_size should be present") + assertTrue(profile.output_total_size != null, "output_total_size should be present") + assertTrue(profile.output_segments_num != null, "output_segments_num should be present") + assertTrue(profile.containsKey("merge_latency_ms"), "merge_latency_ms should be present") + assertTrue(profile.containsKey("bytes_read_from_local"), "bytes_read_from_local should be present") + assertTrue(profile.containsKey("bytes_read_from_remote"), "bytes_read_from_remote should be present") + assertTrue(profile.containsKey("peak_memory_bytes"), "peak_memory_bytes should be present") + assertTrue(profile.containsKey("is_vertical"), "is_vertical should be present") + assertTrue(profile.containsKey("permits"), "permits should be present") + assertTrue(profile.containsKey("vertical_total_groups"), "vertical_total_groups should be present") + assertTrue(profile.containsKey("vertical_completed_groups"), "vertical_completed_groups should be present") + + // Verify field values are reasonable + assertTrue(profile.compaction_id > 0, "compaction_id should be positive") + assertTrue(profile.cost_time_ms >= 0, "cost_time_ms should be non-negative") + assertTrue(profile.input_data_size > 0, "input_data_size should be positive") + assertTrue(profile.input_rowsets_count > 0, "input_rowsets_count should be positive") + assertTrue(profile.input_row_num > 0, "input_row_num should be positive") + + // Step 6: Test top_n filter - ?top_n=1 returns exactly 1 record + def (code2, out2, err2) = curl("GET", baseUrl + "?top_n=1") + logger.info("top_n=1 query: code=" + code2 + ", out=" + out2) + assertEquals(0, code2) + def json2 = parseJson(out2.trim()) + assertEquals("Success", json2.status) + assertEquals(1, json2.compaction_profiles.size()) + + // Step 7: Test tablet_id filter - ?tablet_id=X returns records for that tablet + def (code3, out3, err3) = curl("GET", baseUrl + "?tablet_id=" + tablet_id) + logger.info("tablet_id filter query: code=" + code3 + ", out=" + out3) + assertEquals(0, code3) + def json3 = parseJson(out3.trim()) + assertEquals("Success", json3.status) + assertTrue(json3.compaction_profiles.size() > 0, "Should have profiles for tablet " + tablet_id) + for (def p : json3.compaction_profiles) { + assertEquals(Long.parseLong(tablet_id), p.tablet_id, + "All returned profiles should match the requested tablet_id") + } + + // Step 8: Test compact_type filter - ?compact_type=cumulative + def (code4, out4, err4) = curl("GET", baseUrl + "?compact_type=cumulative") + logger.info("compact_type filter query: code=" + code4 + ", out=" + out4) + assertEquals(0, code4) + def json4 = parseJson(out4.trim()) + assertEquals("Success", json4.status) + assertTrue(json4.compaction_profiles.size() > 0, "Should have cumulative compaction profiles") + for (def p : json4.compaction_profiles) { + assertEquals("cumulative", p.compaction_type, + "All returned profiles should be cumulative type") + } + + // Step 9: Test combined filters - ?tablet_id=X&top_n=1 + def (code5, out5, err5) = curl("GET", baseUrl + "?tablet_id=" + tablet_id + "&top_n=1") + logger.info("Combined filter query: code=" + code5 + ", out=" + out5) + assertEquals(0, code5) + def json5 = parseJson(out5.trim()) + assertEquals("Success", json5.status) + assertTrue(json5.compaction_profiles.size() <= 1, "top_n=1 should return at most 1 record") + if (json5.compaction_profiles.size() > 0) { + assertEquals(Long.parseLong(tablet_id), json5.compaction_profiles[0].tablet_id, + "Returned profile should match the requested tablet_id") + } + + // Step 10: Test invalid params - ?top_n=-1 returns error + def (code6, out6, err6) = curl("GET", baseUrl + "?top_n=-1") + logger.info("Invalid top_n query: code=" + code6 + ", out=" + out6) + assertEquals(0, code6) + def json6 = parseJson(out6.trim()) + // Server should return an error status for invalid top_n + assertTrue(json6.status != "Success" || out6.contains("top_n must be non-negative"), + "Invalid top_n=-1 should return error response") + + // Step 11: Test non-existent tablet - returns empty array + def (code7, out7, err7) = curl("GET", baseUrl + "?tablet_id=999999999") + logger.info("Non-existent tablet query: code=" + code7 + ", out=" + out7) + assertEquals(0, code7) + def json7 = parseJson(out7.trim()) + assertEquals("Success", json7.status) + assertEquals(0, json7.compaction_profiles.size()) + + } finally { + // Step 12: Cleanup + try_sql("DROP TABLE IF EXISTS ${tableName} FORCE") + } +}