diff --git a/be/src/cloud/cloud_base_compaction.h b/be/src/cloud/cloud_base_compaction.h index c89047b919beb3..9fda6cbe12f879 100644 --- a/be/src/cloud/cloud_base_compaction.h +++ b/be/src/cloud/cloud_base_compaction.h @@ -40,6 +40,8 @@ class CloudBaseCompaction : public CloudCompactionMixin { Status pick_rowsets_to_compact(); std::string_view compaction_name() const override { return "CloudBaseCompaction"; } + CompactionProfileType profile_type() const override { return CompactionProfileType::BASE; } + int64_t input_segments_num() const override { return _input_segments; } Status modify_rowsets() override; diff --git a/be/src/cloud/cloud_cumulative_compaction.h b/be/src/cloud/cloud_cumulative_compaction.h index 174d0d57a97cc7..7e694360d798e9 100644 --- a/be/src/cloud/cloud_cumulative_compaction.h +++ b/be/src/cloud/cloud_cumulative_compaction.h @@ -45,6 +45,10 @@ class CloudCumulativeCompaction : public CloudCompactionMixin { Status pick_rowsets_to_compact(); std::string_view compaction_name() const override { return "CloudCumulativeCompaction"; } + CompactionProfileType profile_type() const override { + return CompactionProfileType::CUMULATIVE; + } + int64_t input_segments_num() const override { return _input_segments; } Status modify_rowsets() override; diff --git a/be/src/cloud/cloud_full_compaction.h b/be/src/cloud/cloud_full_compaction.h index e5c440e52b9b8a..ae23bb3884d7cd 100644 --- a/be/src/cloud/cloud_full_compaction.h +++ b/be/src/cloud/cloud_full_compaction.h @@ -41,6 +41,8 @@ class CloudFullCompaction : public CloudCompactionMixin { Status pick_rowsets_to_compact(); std::string_view compaction_name() const override { return "CloudFullCompaction"; } + CompactionProfileType profile_type() const override { return CompactionProfileType::FULL; } + int64_t input_segments_num() const override { return _input_segments; } Status modify_rowsets() override; Status garbage_collection() override; diff --git a/be/src/cloud/cloud_index_change_compaction.h b/be/src/cloud/cloud_index_change_compaction.h index e0bd7952ca7568..1ac033b01ba5d1 100644 --- a/be/src/cloud/cloud_index_change_compaction.h +++ b/be/src/cloud/cloud_index_change_compaction.h @@ -55,6 +55,10 @@ class CloudIndexChangeCompaction : public CloudCompactionMixin { protected: std::string_view compaction_name() const override { return "CloudIndexChangeCompaction"; } + CompactionProfileType profile_type() const override { + return CompactionProfileType::INDEX_CHANGE; + } + int64_t input_segments_num() const override { return _input_segments; } // if cumu rowset is modified, cumu compaction should sync rowset before execute. // if base rowset is modified, base compaction should sync rowset before execute. diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 8eae149c9634a6..fed4e5cbc04d96 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -436,6 +436,8 @@ DEFINE_Bool(enable_low_cardinality_cache_code, "true"); DEFINE_mBool(enable_compaction_checksum, "false"); // whether disable automatic compaction task DEFINE_mBool(disable_auto_compaction, "false"); +// max number of compaction profile records to keep in memory, 0 to disable +DEFINE_mInt32(compaction_profile_max_records, "500"); // whether enable vertical compaction DEFINE_mBool(enable_vertical_compaction, "true"); // whether enable ordered data compaction diff --git a/be/src/common/config.h b/be/src/common/config.h index 43be7f9e37a542..ad5c3d678e37e3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -501,6 +501,8 @@ DECLARE_Bool(enable_low_cardinality_cache_code); DECLARE_mBool(enable_compaction_checksum); // whether disable automatic compaction task DECLARE_mBool(disable_auto_compaction); +// max number of compaction profile records to keep in memory, 0 to disable +DECLARE_mInt32(compaction_profile_max_records); // whether enable vertical compaction DECLARE_mBool(enable_vertical_compaction); // whether enable ordered data compaction diff --git a/be/src/service/http/action/compaction_profile_action.cpp b/be/src/service/http/action/compaction_profile_action.cpp new file mode 100644 index 00000000000000..83752d29e894f4 --- /dev/null +++ b/be/src/service/http/action/compaction_profile_action.cpp @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "service/http/action/compaction_profile_action.h" + +#include +#include +#include + +#include +#include + +#include "service/http/http_channel.h" +#include "service/http/http_headers.h" +#include "service/http/http_request.h" +#include "service/http/http_status.h" +#include "storage/compaction/compaction_profile_mgr.h" + +namespace doris { + +CompactionProfileAction::CompactionProfileAction(ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type ptype) + : HttpHandlerWithAuth(exec_env, hier, ptype) {} + +void CompactionProfileAction::handle(HttpRequest* req) { + req->add_output_header(HttpHeaders::CONTENT_TYPE, "application/json"); + + int64_t tablet_id = 0; + int64_t top_n = 0; + + const auto& tablet_id_str = req->param("tablet_id"); + if (!tablet_id_str.empty()) { + try { + tablet_id = std::stoll(tablet_id_str); + } catch (const std::exception&) { + HttpChannel::send_reply( + req, HttpStatus::BAD_REQUEST, + R"({"status": "Failed", "msg": "invalid tablet_id parameter"})"); + return; + } + } + + const auto& top_n_str = req->param("top_n"); + if (!top_n_str.empty()) { + try { + top_n = std::stoll(top_n_str); + } catch (const std::exception&) { + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, + R"({"status": "Failed", "msg": "invalid top_n parameter"})"); + return; + } + if (top_n < 0) { + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, + R"({"status": "Failed", "msg": "top_n must be non-negative"})"); + return; + } + } + + auto records = CompactionProfileManager::instance()->get_records(tablet_id, top_n); + + rapidjson::Document root; + root.SetObject(); + auto& allocator = root.GetAllocator(); + + root.AddMember("status", "Success", allocator); + + rapidjson::Value profiles(rapidjson::kArrayType); + for (const auto& record : records) { + rapidjson::Value obj; + record.to_json(obj, allocator); + profiles.PushBack(obj, allocator); + } + root.AddMember("compaction_profiles", profiles, allocator); + + rapidjson::StringBuffer str_buf; + rapidjson::PrettyWriter writer(str_buf); + root.Accept(writer); + + HttpChannel::send_reply(req, HttpStatus::OK, str_buf.GetString()); +} + +} // namespace doris diff --git a/be/src/service/http/action/compaction_profile_action.h b/be/src/service/http/action/compaction_profile_action.h new file mode 100644 index 00000000000000..be767355a65975 --- /dev/null +++ b/be/src/service/http/action/compaction_profile_action.h @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "service/http/http_handler_with_auth.h" + +namespace doris { +class HttpRequest; +class ExecEnv; + +class CompactionProfileAction : public HttpHandlerWithAuth { +public: + CompactionProfileAction(ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type ptype); + ~CompactionProfileAction() override = default; + + void handle(HttpRequest* req) override; +}; + +} // namespace doris diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index f97c5ebde5ac09..9b242f59891dd3 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -43,6 +43,7 @@ #include "service/http/action/checksum_action.h" #include "service/http/action/clear_cache_action.h" #include "service/http/action/compaction_action.h" +#include "service/http/action/compaction_profile_action.h" #include "service/http/action/compaction_score_action.h" #include "service/http/action/config_action.h" #include "service/http/action/debug_point_action.h" @@ -280,6 +281,11 @@ Status HttpService::start() { ShrinkMemAction* shrink_mem_action = _pool.add(new ShrinkMemAction(_env)); _ev_http_server->register_handler(HttpMethod::GET, "/api/shrink_mem", shrink_mem_action); + CompactionProfileAction* compaction_profile_action = _pool.add( + new CompactionProfileAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); + _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/profile", + compaction_profile_action); + #ifndef BE_TEST auto& engine = _env->storage_engine(); if (config::is_cloud_mode()) { diff --git a/be/src/storage/compaction/base_compaction.h b/be/src/storage/compaction/base_compaction.h index 453583f8227abf..c01fc5e16e154c 100644 --- a/be/src/storage/compaction/base_compaction.h +++ b/be/src/storage/compaction/base_compaction.h @@ -43,6 +43,7 @@ class BaseCompaction final : public CompactionMixin { private: Status pick_rowsets_to_compact(); std::string_view compaction_name() const override { return "base compaction"; } + CompactionProfileType profile_type() const override { return CompactionProfileType::BASE; } ReaderType compaction_type() const override { return ReaderType::READER_BASE_COMPACTION; } diff --git a/be/src/storage/compaction/cold_data_compaction.h b/be/src/storage/compaction/cold_data_compaction.h index 94ee993a3060d5..bcd890078754b9 100644 --- a/be/src/storage/compaction/cold_data_compaction.h +++ b/be/src/storage/compaction/cold_data_compaction.h @@ -35,6 +35,7 @@ class ColdDataCompaction final : public CompactionMixin { private: std::string_view compaction_name() const override { return "cold data compaction"; } + CompactionProfileType profile_type() const override { return CompactionProfileType::COLD_DATA; } ReaderType compaction_type() const override { return ReaderType::READER_COLD_DATA_COMPACTION; } Status construct_output_rowset_writer(RowsetWriterContext& ctx) override; diff --git a/be/src/storage/compaction/compaction.cpp b/be/src/storage/compaction/compaction.cpp index 698c81f7849fd0..a75c01df5f932f 100644 --- a/be/src/storage/compaction/compaction.cpp +++ b/be/src/storage/compaction/compaction.cpp @@ -157,6 +157,7 @@ Compaction::Compaction(BaseTabletSPtr tablet, const std::string& label) : _mem_tracker( MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::COMPACTION, label)), _tablet(std::move(tablet)), + _compaction_id(CompactionProfileManager::instance()->next_compaction_id()), _is_vertical(config::enable_vertical_compaction), _allow_delete_in_cumu_compaction(config::enable_delete_when_cumu_compaction), _enable_vertical_compact_variant_subcolumns( @@ -177,6 +178,46 @@ Compaction::~Compaction() { _rowid_conversion.reset(); } +void Compaction::submit_profile_record(bool success, int64_t start_time_ms, + const std::string& status_msg) { + CompactionProfileRecord record; + record.compaction_id = _compaction_id; + record.compaction_type = profile_type(); + record.tablet_id = _tablet->tablet_id(); + record.start_time_ms = start_time_ms; + record.end_time_ms = UnixMillis(); + record.cost_time_ms = record.end_time_ms - record.start_time_ms; + record.success = success; + record.status_msg = status_msg; + + record.input_rowsets_data_size = _input_rowsets_data_size; + record.input_rowsets_count = static_cast(_input_rowsets.size()); + record.input_row_num = _input_row_num; + record.input_segments_num = input_segments_num(); + record.input_rowsets_index_size = _input_rowsets_index_size; + record.input_rowsets_total_size = _input_rowsets_total_size; + + record.merged_rows = _stats.merged_rows; + record.filtered_rows = _stats.filtered_rows; + record.output_rows = _stats.output_rows; + record.bytes_read_from_local = _stats.bytes_read_from_local; + record.bytes_read_from_remote = _stats.bytes_read_from_remote; + + record.merge_rowsets_latency_ns = + _merge_rowsets_latency_timer ? _merge_rowsets_latency_timer->value() : 0; + + if (_output_rowset) { + record.output_rowset_data_size = _output_rowset->data_disk_size(); + record.output_row_num = _output_rowset->num_rows(); + record.output_segments_num = _output_rowset->num_segments(); + record.output_rowset_index_size = _output_rowset->index_disk_size(); + record.output_rowset_total_size = _output_rowset->total_disk_size(); + record.output_version = _output_version.to_string(); + } + + CompactionProfileManager::instance()->add_record(std::move(record)); +} + void Compaction::init_profile(const std::string& label) { _profile = std::make_unique(label); @@ -533,13 +574,19 @@ bool CompactionMixin::handle_ordered_data_compaction() { } Status CompactionMixin::execute_compact() { + int64_t profile_start_time_ms = UnixMillis(); + uint32_t checksum_before; uint32_t checksum_after; bool enable_compaction_checksum = config::enable_compaction_checksum; if (enable_compaction_checksum) { EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), _input_rowsets.back()->end_version(), &checksum_before); - RETURN_IF_ERROR(checksum_task.execute()); + auto checksum_before_st = checksum_task.execute(); + if (!checksum_before_st.ok()) { + submit_profile_record(false, profile_start_time_ms, checksum_before_st.to_string()); + return checksum_before_st; + } } auto* data_dir = tablet()->data_dir(); @@ -553,20 +600,52 @@ Status CompactionMixin::execute_compact() { data_dir->disks_compaction_num_increment(-1); }; - HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(execute_compact_impl(permits), record_compaction_stats); + // Inline HANDLE_EXCEPTION_IF_CATCH_EXCEPTION to capture failure status for profile recording + Status impl_status; + try { + doris::enable_thread_catch_bad_alloc++; + Defer alloc_defer {[&]() { doris::enable_thread_catch_bad_alloc--; }}; + impl_status = execute_compact_impl(permits); + if (UNLIKELY(!impl_status.ok())) { + record_compaction_stats(doris::Exception()); + } + } catch (const doris::Exception& e) { + record_compaction_stats(e); + if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) { + impl_status = Status::MemoryLimitExceeded(fmt::format( + "PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{}, __FUNCTION__:{}", + e.code(), e.to_string(), __FILE__, __LINE__, __PRETTY_FUNCTION__)); + } else { + impl_status = e.to_status(); + } + } + + if (!impl_status.ok()) { + submit_profile_record(false, profile_start_time_ms, impl_status.to_string()); + return impl_status; + } + record_compaction_stats(doris::Exception()); if (enable_compaction_checksum) { EngineChecksumTask checksum_task(_engine, _tablet->tablet_id(), _tablet->schema_hash(), _input_rowsets.back()->end_version(), &checksum_after); - RETURN_IF_ERROR(checksum_task.execute()); + auto checksum_st = checksum_task.execute(); + if (!checksum_st.ok()) { + submit_profile_record(false, profile_start_time_ms, checksum_st.to_string()); + return checksum_st; + } if (checksum_before != checksum_after) { - return Status::InternalError( + auto st = Status::InternalError( "compaction tablet checksum not consistent, before={}, after={}, tablet_id={}", checksum_before, checksum_after, _tablet->tablet_id()); + submit_profile_record(false, profile_start_time_ms, st.to_string()); + return st; } } + submit_profile_record(true, profile_start_time_ms); + DorisMetrics::instance()->local_compaction_read_rows_total->increment(_input_row_num); DorisMetrics::instance()->local_compaction_read_bytes_total->increment( _input_rowsets_total_size); @@ -1672,21 +1751,45 @@ size_t CloudCompactionMixin::apply_txn_size_truncation_and_log(const std::string Status CloudCompactionMixin::execute_compact() { TEST_INJECTION_POINT("Compaction::do_compaction"); + int64_t profile_start_time_ms = UnixMillis(); int64_t permits = get_compaction_permits(); - HANDLE_EXCEPTION_IF_CATCH_EXCEPTION( - execute_compact_impl(permits), [&](const doris::Exception& ex) { - auto st = garbage_collection(); - if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && - _tablet->enable_unique_key_merge_on_write() && !st.ok()) { - // if compaction fail, be will try to abort compaction, and delete bitmap lock - // will release if abort job successfully, but if abort failed, delete bitmap - // lock will not release, in this situation, be need to send this rpc to ms - // to try to release delete bitmap lock. - _engine.meta_mgr().remove_delete_bitmap_update_lock( - _tablet->table_id(), COMPACTION_DELETE_BITMAP_LOCK_ID, initiator(), - _tablet->tablet_id()); - } - }); + + auto cloud_exception_handler = [&](const doris::Exception& ex) { + auto st = garbage_collection(); + if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && + _tablet->enable_unique_key_merge_on_write() && !st.ok()) { + _engine.meta_mgr().remove_delete_bitmap_update_lock(_tablet->table_id(), + COMPACTION_DELETE_BITMAP_LOCK_ID, + initiator(), _tablet->tablet_id()); + } + }; + + // Inline HANDLE_EXCEPTION_IF_CATCH_EXCEPTION to capture failure status for profile recording + Status impl_status; + try { + doris::enable_thread_catch_bad_alloc++; + Defer alloc_defer {[&]() { doris::enable_thread_catch_bad_alloc--; }}; + impl_status = execute_compact_impl(permits); + if (UNLIKELY(!impl_status.ok())) { + cloud_exception_handler(doris::Exception()); + } + } catch (const doris::Exception& e) { + cloud_exception_handler(e); + if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) { + impl_status = Status::MemoryLimitExceeded(fmt::format( + "PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{}, __FUNCTION__:{}", + e.code(), e.to_string(), __FILE__, __LINE__, __PRETTY_FUNCTION__)); + } else { + impl_status = e.to_status(); + } + } + + if (!impl_status.ok()) { + submit_profile_record(false, profile_start_time_ms, impl_status.to_string()); + return impl_status; + } + + submit_profile_record(true, profile_start_time_ms); DorisMetrics::instance()->remote_compaction_read_rows_total->increment(_input_row_num); DorisMetrics::instance()->remote_compaction_write_rows_total->increment( diff --git a/be/src/storage/compaction/compaction.h b/be/src/storage/compaction/compaction.h index 772c8b30aa4172..93b71653da1dda 100644 --- a/be/src/storage/compaction/compaction.h +++ b/be/src/storage/compaction/compaction.h @@ -35,6 +35,7 @@ #include "common/status.h" #include "io/io_common.h" #include "runtime/runtime_profile.h" +#include "storage/compaction/compaction_profile_mgr.h" #include "storage/merger.h" #include "storage/olap_common.h" #include "storage/rowid_conversion.h" @@ -74,6 +75,8 @@ class Compaction { virtual ReaderType compaction_type() const = 0; virtual std::string_view compaction_name() const = 0; + virtual CompactionProfileType profile_type() const = 0; + virtual int64_t input_segments_num() const { return _input_num_segments; } // the difference between index change compmaction and other compaction. // 1. delete predicate should be kept when input is cumu rowset. @@ -110,6 +113,9 @@ class Compaction { virtual Status update_delete_bitmap() = 0; + void submit_profile_record(bool success, int64_t start_time_ms, + const std::string& status_msg = ""); + // the root tracker for this compaction std::shared_ptr _mem_tracker; @@ -121,6 +127,7 @@ class Compaction { int64_t _input_rowsets_total_size {0}; int64_t _input_row_num {0}; int64_t _input_num_segments {0}; + int64_t _compaction_id {0}; int64_t _local_read_bytes_total {}; int64_t _remote_read_bytes_total {}; diff --git a/be/src/storage/compaction/compaction_profile_mgr.cpp b/be/src/storage/compaction/compaction_profile_mgr.cpp new file mode 100644 index 00000000000000..e3400320ba2147 --- /dev/null +++ b/be/src/storage/compaction/compaction_profile_mgr.cpp @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "storage/compaction/compaction_profile_mgr.h" + +#include + +#include "common/config.h" + +namespace doris { + +const char* to_string(CompactionProfileType type) { + switch (type) { + case CompactionProfileType::BASE: + return "base"; + case CompactionProfileType::CUMULATIVE: + return "cumulative"; + case CompactionProfileType::FULL: + return "full"; + case CompactionProfileType::SINGLE_REPLICA: + return "single_replica"; + case CompactionProfileType::COLD_DATA: + return "cold_data"; + case CompactionProfileType::INDEX_CHANGE: + return "index_change"; + } + return "unknown"; +} + +namespace { + +void add_time_string(rapidjson::Value& obj, const char* key, int64_t time_ms, + rapidjson::Document::AllocatorType& allocator) { + time_t seconds = time_ms / 1000; + struct tm tm_buf; + localtime_r(&seconds, &tm_buf); + char buf[32]; + strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &tm_buf); + obj.AddMember(rapidjson::Value(key, allocator), rapidjson::Value(buf, allocator), allocator); +} + +} // namespace + +void CompactionProfileRecord::to_json(rapidjson::Value& obj, + rapidjson::Document::AllocatorType& allocator) const { + obj.SetObject(); + obj.AddMember("compaction_id", compaction_id, allocator); + obj.AddMember("compaction_type", rapidjson::Value(doris::to_string(compaction_type), allocator), + allocator); + obj.AddMember("tablet_id", tablet_id, allocator); + add_time_string(obj, "start_time", start_time_ms, allocator); + add_time_string(obj, "end_time", end_time_ms, allocator); + obj.AddMember("cost_time_ms", cost_time_ms, allocator); + obj.AddMember("success", success, allocator); + if (!success) { + obj.AddMember("status_msg", rapidjson::Value(status_msg.c_str(), allocator), allocator); + } + + obj.AddMember("input_rowsets_data_size", input_rowsets_data_size, allocator); + obj.AddMember("input_rowsets_count", input_rowsets_count, allocator); + obj.AddMember("input_row_num", input_row_num, allocator); + obj.AddMember("input_segments_num", input_segments_num, allocator); + obj.AddMember("input_rowsets_index_size", input_rowsets_index_size, allocator); + obj.AddMember("input_rowsets_total_size", input_rowsets_total_size, allocator); + + obj.AddMember("merged_rows", merged_rows, allocator); + obj.AddMember("filtered_rows", filtered_rows, allocator); + obj.AddMember("output_rows", output_rows, allocator); + + obj.AddMember("output_rowset_data_size", output_rowset_data_size, allocator); + obj.AddMember("output_row_num", output_row_num, allocator); + obj.AddMember("output_segments_num", output_segments_num, allocator); + obj.AddMember("output_rowset_index_size", output_rowset_index_size, allocator); + obj.AddMember("output_rowset_total_size", output_rowset_total_size, allocator); + + obj.AddMember("merge_rowsets_latency_ms", merge_rowsets_latency_ns / 1000000, allocator); + + obj.AddMember("bytes_read_from_local", bytes_read_from_local, allocator); + obj.AddMember("bytes_read_from_remote", bytes_read_from_remote, allocator); + + if (!output_version.empty()) { + obj.AddMember("output_version", rapidjson::Value(output_version.c_str(), allocator), + allocator); + } +} + +CompactionProfileManager* CompactionProfileManager::instance() { + static CompactionProfileManager s_instance; + return &s_instance; +} + +void CompactionProfileManager::_trim_locked() { + int32_t max = config::compaction_profile_max_records; + if (max <= 0) { + _records.clear(); + return; + } + while (static_cast(_records.size()) > max) { + _records.pop_front(); + } +} + +void CompactionProfileManager::add_record(CompactionProfileRecord record) { + std::unique_lock lock(_mutex); + + if (config::compaction_profile_max_records <= 0) { + _trim_locked(); + return; + } + + _records.push_back(std::move(record)); + _trim_locked(); +} + +std::vector CompactionProfileManager::get_records(int64_t tablet_id, + int64_t top_n) { + int32_t max = config::compaction_profile_max_records; + if (max <= 0) { + std::unique_lock lock(_mutex); + _trim_locked(); + return {}; + } + + std::shared_lock lock(_mutex); + std::vector result; + int32_t count = 0; + for (auto it = _records.rbegin(); it != _records.rend(); ++it) { + if (count >= max) { + break; + } + count++; + if (tablet_id != 0 && it->tablet_id != tablet_id) { + continue; + } + result.push_back(*it); + if (top_n > 0 && static_cast(result.size()) >= top_n) { + break; + } + } + return result; +} + +} // namespace doris diff --git a/be/src/storage/compaction/compaction_profile_mgr.h b/be/src/storage/compaction/compaction_profile_mgr.h new file mode 100644 index 00000000000000..2d14a99a6d5448 --- /dev/null +++ b/be/src/storage/compaction/compaction_profile_mgr.h @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include + +namespace doris { + +enum class CompactionProfileType : uint8_t { + BASE = 0, + CUMULATIVE = 1, + FULL = 2, + SINGLE_REPLICA = 3, + COLD_DATA = 4, + INDEX_CHANGE = 5, +}; + +const char* to_string(CompactionProfileType type); + +struct CompactionProfileRecord { + int64_t compaction_id {0}; + CompactionProfileType compaction_type {CompactionProfileType::BASE}; + int64_t tablet_id {0}; + int64_t start_time_ms {0}; + int64_t end_time_ms {0}; + int64_t cost_time_ms {0}; + bool success {false}; + std::string status_msg; + + // input stats (from member variables, available in all paths) + int64_t input_rowsets_data_size {0}; + int64_t input_rowsets_count {0}; + int64_t input_row_num {0}; + int64_t input_segments_num {0}; + int64_t input_rowsets_index_size {0}; + int64_t input_rowsets_total_size {0}; + + // merge stats (from Merger::Statistics, updated in merge path) + int64_t merged_rows {0}; + int64_t filtered_rows {0}; + int64_t output_rows {0}; + + // output rowset stats (available when _output_rowset is built) + int64_t output_rowset_data_size {0}; + int64_t output_row_num {0}; + int64_t output_segments_num {0}; + int64_t output_rowset_index_size {0}; + int64_t output_rowset_total_size {0}; + + // timer (from RuntimeProfile, updated in merge path, 0 for ordered path) + int64_t merge_rowsets_latency_ns {0}; + + // IO stats + int64_t bytes_read_from_local {0}; + int64_t bytes_read_from_remote {0}; + + // version info + std::string output_version; + + void to_json(rapidjson::Value& obj, rapidjson::Document::AllocatorType& allocator) const; +}; + +class CompactionProfileManager { +public: + static CompactionProfileManager* instance(); + + int64_t next_compaction_id() { return _next_id.fetch_add(1, std::memory_order_relaxed); } + + void add_record(CompactionProfileRecord record); + + // Non-const: when config=0, upgrades to write lock to clear stale records. + std::vector get_records(int64_t tablet_id = 0, int64_t top_n = 0); + +private: + CompactionProfileManager() = default; + + void _trim_locked(); + + std::atomic _next_id {1}; + + std::shared_mutex _mutex; + std::deque _records; +}; + +} // namespace doris diff --git a/be/src/storage/compaction/cumulative_compaction.h b/be/src/storage/compaction/cumulative_compaction.h index 9e5bbbfcfb5241..b4f31f86d635d6 100644 --- a/be/src/storage/compaction/cumulative_compaction.h +++ b/be/src/storage/compaction/cumulative_compaction.h @@ -39,6 +39,9 @@ class CumulativeCompaction final : public CompactionMixin { private: std::string_view compaction_name() const override { return "cumulative compaction"; } + CompactionProfileType profile_type() const override { + return CompactionProfileType::CUMULATIVE; + } ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; } diff --git a/be/src/storage/compaction/full_compaction.h b/be/src/storage/compaction/full_compaction.h index fb80613f722db6..6cf62e4e23065e 100644 --- a/be/src/storage/compaction/full_compaction.h +++ b/be/src/storage/compaction/full_compaction.h @@ -44,6 +44,7 @@ class FullCompaction final : public CompactionMixin { Status modify_rowsets() override; std::string_view compaction_name() const override { return "full compaction"; } + CompactionProfileType profile_type() const override { return CompactionProfileType::FULL; } ReaderType compaction_type() const override { return ReaderType::READER_FULL_COMPACTION; } diff --git a/be/src/storage/compaction/single_replica_compaction.cpp b/be/src/storage/compaction/single_replica_compaction.cpp index a0b36cd417851f..d8799fa7efc201 100644 --- a/be/src/storage/compaction/single_replica_compaction.cpp +++ b/be/src/storage/compaction/single_replica_compaction.cpp @@ -41,6 +41,7 @@ #include "util/client_cache.h" #include "util/security.h" #include "util/thrift_rpc_helper.h" +#include "util/time.h" #include "util/trace.h" namespace doris { @@ -67,6 +68,7 @@ Status SingleReplicaCompaction::prepare_compact() { } Status SingleReplicaCompaction::execute_compact() { + // Pre-checks: early return without profile recording (intentionally excluded) if (!tablet()->should_fetch_from_peer()) { return Status::Aborted("compaction should be performed locally"); } @@ -83,10 +85,15 @@ Status SingleReplicaCompaction::execute_compact() { "another base compaction is running. tablet={}", _tablet->tablet_id()); } + // Pre-checks passed, start timing for profile recording + int64_t profile_start_time_ms = UnixMillis(); + SCOPED_ATTACH_TASK(_mem_tracker); // do single replica compaction - RETURN_IF_ERROR(_do_single_replica_compaction()); + Status st = _do_single_replica_compaction(); + submit_profile_record(st.ok(), profile_start_time_ms, st.ok() ? "" : st.to_string()); + RETURN_IF_ERROR(st); _state = CompactionState::SUCCESS; diff --git a/be/src/storage/compaction/single_replica_compaction.h b/be/src/storage/compaction/single_replica_compaction.h index c4f4ee0b15e55e..e339431b486ed2 100644 --- a/be/src/storage/compaction/single_replica_compaction.h +++ b/be/src/storage/compaction/single_replica_compaction.h @@ -41,6 +41,9 @@ class SingleReplicaCompaction final : public CompactionMixin { protected: std::string_view compaction_name() const override { return "single replica compaction"; } + CompactionProfileType profile_type() const override { + return CompactionProfileType::SINGLE_REPLICA; + } ReaderType compaction_type() const override { return (_compaction_type == CompactionType::CUMULATIVE_COMPACTION) ? ReaderType::READER_CUMULATIVE_COMPACTION diff --git a/be/test/cloud/cloud_compaction_test.cpp b/be/test/cloud/cloud_compaction_test.cpp index 05eec3149f996a..846e7c0cb84370 100644 --- a/be/test/cloud/cloud_compaction_test.cpp +++ b/be/test/cloud/cloud_compaction_test.cpp @@ -239,6 +239,8 @@ class TestableCloudCompaction : public CloudCompactionMixin { ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; } std::string_view compaction_name() const override { return "test_compaction"; } + + CompactionProfileType profile_type() const override { return CompactionProfileType::BASE; } }; TEST_F(CloudCompactionTest, test_set_storage_resource_from_input_rowsets) { diff --git a/be/test/storage/compaction/compaction_profile_mgr_test.cpp b/be/test/storage/compaction/compaction_profile_mgr_test.cpp new file mode 100644 index 00000000000000..e2549a4841a785 --- /dev/null +++ b/be/test/storage/compaction/compaction_profile_mgr_test.cpp @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "storage/compaction/compaction_profile_mgr.h" + +#include + +#include +#include + +#include "common/config.h" + +namespace doris { + +class CompactionProfileMgrTest : public testing::Test { +public: + void SetUp() override { + _saved_max_records = config::compaction_profile_max_records; + config::compaction_profile_max_records = 100; + // Clear any residual records from other tests + auto* mgr = CompactionProfileManager::instance(); + config::compaction_profile_max_records = 0; + mgr->get_records(); // triggers clear via write lock + config::compaction_profile_max_records = 100; + } + + void TearDown() override { config::compaction_profile_max_records = _saved_max_records; } + +protected: + static CompactionProfileRecord make_record(int64_t tablet_id, CompactionProfileType type, + bool success = true) { + auto* mgr = CompactionProfileManager::instance(); + CompactionProfileRecord r; + r.compaction_id = mgr->next_compaction_id(); + r.compaction_type = type; + r.tablet_id = tablet_id; + r.start_time_ms = 1000; + r.end_time_ms = 1100; + r.cost_time_ms = 100; + r.success = success; + r.input_rowsets_data_size = 1024; + r.input_rowsets_count = 3; + r.input_row_num = 500; + r.input_segments_num = 5; + r.output_rowset_data_size = 512; + r.output_row_num = 490; + r.output_segments_num = 1; + r.output_version = "[0-3]"; + if (!success) { + r.status_msg = "test failure"; + } + return r; + } + + int32_t _saved_max_records = 0; +}; + +TEST_F(CompactionProfileMgrTest, BasicAddAndGet) { + auto* mgr = CompactionProfileManager::instance(); + mgr->add_record(make_record(100, CompactionProfileType::BASE)); + mgr->add_record(make_record(200, CompactionProfileType::CUMULATIVE)); + + auto records = mgr->get_records(); + ASSERT_EQ(records.size(), 2); + // Most recent first + EXPECT_EQ(records[0].tablet_id, 200); + EXPECT_EQ(records[1].tablet_id, 100); + EXPECT_EQ(to_string(records[0].compaction_type), std::string("cumulative")); + EXPECT_EQ(to_string(records[1].compaction_type), std::string("base")); +} + +TEST_F(CompactionProfileMgrTest, FilterByTabletId) { + auto* mgr = CompactionProfileManager::instance(); + mgr->add_record(make_record(100, CompactionProfileType::BASE)); + mgr->add_record(make_record(200, CompactionProfileType::CUMULATIVE)); + mgr->add_record(make_record(100, CompactionProfileType::FULL)); + mgr->add_record(make_record(300, CompactionProfileType::BASE)); + + auto records = mgr->get_records(100); + ASSERT_EQ(records.size(), 2); + EXPECT_EQ(records[0].tablet_id, 100); + EXPECT_EQ(records[1].tablet_id, 100); + + records = mgr->get_records(999); + EXPECT_TRUE(records.empty()); +} + +TEST_F(CompactionProfileMgrTest, TopN) { + auto* mgr = CompactionProfileManager::instance(); + for (int i = 0; i < 20; i++) { + mgr->add_record(make_record(100 + i, CompactionProfileType::BASE)); + } + + auto records = mgr->get_records(0, 5); + ASSERT_EQ(records.size(), 5); + // Most recent first + EXPECT_EQ(records[0].tablet_id, 119); + EXPECT_EQ(records[4].tablet_id, 115); +} + +TEST_F(CompactionProfileMgrTest, TrimOldRecords) { + auto* mgr = CompactionProfileManager::instance(); + config::compaction_profile_max_records = 10; + + for (int i = 0; i < 20; i++) { + mgr->add_record(make_record(i, CompactionProfileType::CUMULATIVE)); + } + + auto records = mgr->get_records(); + ASSERT_EQ(records.size(), 10); + // Only the 10 most recent remain (tablet_id 10..19) + EXPECT_EQ(records[0].tablet_id, 19); + EXPECT_EQ(records[9].tablet_id, 10); +} + +TEST_F(CompactionProfileMgrTest, DynamicConfigDisable) { + auto* mgr = CompactionProfileManager::instance(); + mgr->add_record(make_record(100, CompactionProfileType::BASE)); + mgr->add_record(make_record(200, CompactionProfileType::BASE)); + + ASSERT_EQ(mgr->get_records().size(), 2); + + // Disable: get_records returns empty and clears deque + config::compaction_profile_max_records = 0; + auto records = mgr->get_records(); + EXPECT_TRUE(records.empty()); + + // New records should not be stored + mgr->add_record(make_record(300, CompactionProfileType::BASE)); + records = mgr->get_records(); + EXPECT_TRUE(records.empty()); +} + +TEST_F(CompactionProfileMgrTest, DynamicConfigShrink) { + auto* mgr = CompactionProfileManager::instance(); + config::compaction_profile_max_records = 50; + + for (int i = 0; i < 50; i++) { + mgr->add_record(make_record(i, CompactionProfileType::BASE)); + } + ASSERT_EQ(mgr->get_records().size(), 50); + + // Shrink to 10 + config::compaction_profile_max_records = 10; + auto records = mgr->get_records(); + // get_records limits by current config + EXPECT_LE(static_cast(records.size()), 10); + + // After add, physical trim happens + mgr->add_record(make_record(999, CompactionProfileType::BASE)); + records = mgr->get_records(); + ASSERT_EQ(records.size(), 10); + EXPECT_EQ(records[0].tablet_id, 999); +} + +TEST_F(CompactionProfileMgrTest, DynamicConfigRestore) { + auto* mgr = CompactionProfileManager::instance(); + + // Add some records + mgr->add_record(make_record(100, CompactionProfileType::BASE)); + mgr->add_record(make_record(200, CompactionProfileType::BASE)); + ASSERT_EQ(mgr->get_records().size(), 2); + + // Disable: clears old records + config::compaction_profile_max_records = 0; + mgr->get_records(); // trigger clear + + // Restore + config::compaction_profile_max_records = 100; + auto records = mgr->get_records(); + // Old records should not reappear + EXPECT_TRUE(records.empty()); + + // New records work + mgr->add_record(make_record(300, CompactionProfileType::BASE)); + records = mgr->get_records(); + ASSERT_EQ(records.size(), 1); + EXPECT_EQ(records[0].tablet_id, 300); +} + +TEST_F(CompactionProfileMgrTest, FailedRecordFields) { + auto* mgr = CompactionProfileManager::instance(); + auto record = make_record(100, CompactionProfileType::FULL, false); + record.output_rowset_data_size = 512; + record.output_version = "[0-5]"; + mgr->add_record(std::move(record)); + + auto records = mgr->get_records(); + ASSERT_EQ(records.size(), 1); + EXPECT_FALSE(records[0].success); + EXPECT_EQ(records[0].status_msg, "test failure"); + // Output fields preserved even on failure + EXPECT_EQ(records[0].output_rowset_data_size, 512); + EXPECT_EQ(records[0].output_version, "[0-5]"); +} + +TEST_F(CompactionProfileMgrTest, ToJson) { + auto* mgr = CompactionProfileManager::instance(); + mgr->add_record(make_record(100, CompactionProfileType::BASE)); + + auto records = mgr->get_records(); + ASSERT_EQ(records.size(), 1); + + rapidjson::Document doc; + rapidjson::Value obj; + records[0].to_json(obj, doc.GetAllocator()); + + EXPECT_TRUE(obj.HasMember("compaction_id")); + EXPECT_TRUE(obj.HasMember("compaction_type")); + EXPECT_STREQ(obj["compaction_type"].GetString(), "base"); + EXPECT_EQ(obj["tablet_id"].GetInt64(), 100); + EXPECT_TRUE(obj["success"].GetBool()); + EXPECT_TRUE(obj.HasMember("start_time")); + EXPECT_TRUE(obj.HasMember("end_time")); + EXPECT_EQ(obj["cost_time_ms"].GetInt64(), 100); + EXPECT_EQ(obj["input_rowsets_data_size"].GetInt64(), 1024); + EXPECT_EQ(obj["output_rowset_data_size"].GetInt64(), 512); + EXPECT_STREQ(obj["output_version"].GetString(), "[0-3]"); +} + +TEST_F(CompactionProfileMgrTest, AllProfileTypes) { + EXPECT_STREQ(to_string(CompactionProfileType::BASE), "base"); + EXPECT_STREQ(to_string(CompactionProfileType::CUMULATIVE), "cumulative"); + EXPECT_STREQ(to_string(CompactionProfileType::FULL), "full"); + EXPECT_STREQ(to_string(CompactionProfileType::SINGLE_REPLICA), "single_replica"); + EXPECT_STREQ(to_string(CompactionProfileType::COLD_DATA), "cold_data"); + EXPECT_STREQ(to_string(CompactionProfileType::INDEX_CHANGE), "index_change"); +} + +TEST_F(CompactionProfileMgrTest, ConcurrentSafety) { + auto* mgr = CompactionProfileManager::instance(); + config::compaction_profile_max_records = 500; + + constexpr int kWriters = 4; + constexpr int kRecordsPerWriter = 100; + constexpr int kReaders = 2; + + std::atomic stop {false}; + std::vector threads; + + // Writers + for (int w = 0; w < kWriters; w++) { + threads.emplace_back([&, w]() { + for (int i = 0; i < kRecordsPerWriter; i++) { + mgr->add_record(make_record(w * 1000 + i, CompactionProfileType::BASE)); + } + }); + } + + // Readers + for (int r = 0; r < kReaders; r++) { + threads.emplace_back([&]() { + while (!stop.load(std::memory_order_relaxed)) { + auto records = mgr->get_records(); + // Just verify no crash + for (const auto& rec : records) { + (void)rec.tablet_id; + } + } + }); + } + + // Wait for writers + for (int i = 0; i < kWriters; i++) { + threads[i].join(); + } + stop.store(true, std::memory_order_relaxed); + + // Wait for readers + for (int i = kWriters; i < static_cast(threads.size()); i++) { + threads[i].join(); + } + + auto records = mgr->get_records(); + EXPECT_EQ(records.size(), kWriters * kRecordsPerWriter); +} + +TEST_F(CompactionProfileMgrTest, MonotonicallyIncreasingIds) { + auto* mgr = CompactionProfileManager::instance(); + int64_t prev_id = mgr->next_compaction_id(); + for (int i = 0; i < 100; i++) { + int64_t id = mgr->next_compaction_id(); + EXPECT_GT(id, prev_id); + prev_id = id; + } +} + +} // namespace doris diff --git a/regression-test/suites/compaction/test_compaction_profile_action.groovy b/regression-test/suites/compaction/test_compaction_profile_action.groovy new file mode 100644 index 00000000000000..afe3e59d3edb72 --- /dev/null +++ b/regression-test/suites/compaction/test_compaction_profile_action.groovy @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_compaction_profile_action") { + def tableName = "test_compaction_profile_action" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + id INT NOT NULL, + name STRING NOT NULL + ) DUPLICATE KEY (`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ("replication_num" = "1", "disable_auto_compaction" = "true"); + """ + + // Insert multiple batches to create rowsets for compaction + for (i in 0..<10) { + sql """ INSERT INTO ${tableName} VALUES(${i}, "row_${i}") """ + } + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort) + + def tablets = sql_return_maparray """ show tablets from ${tableName} """ + def tablet = tablets[0] + def tablet_id = tablet.TabletId + def be_host = backendId_to_backendIP["${tablet.BackendId}"] + def be_port = backendId_to_backendHttpPort["${tablet.BackendId}"] + def beHttpAddress = "${be_host}:${be_port}" + + // 1. Test: API returns valid JSON with empty or existing profiles + def (code1, text1, err1) = curl("GET", "${beHttpAddress}/api/compaction/profile") + assertEquals(0, code1) + def resp1 = parseJson(text1.trim()) + assertEquals("Success", resp1.status) + assertNotNull(resp1.compaction_profiles) + def profileCountBefore = resp1.compaction_profiles.size() + log.info("Profile count before compaction: ${profileCountBefore}") + + // 2. Trigger a cumulative compaction + def (code2, text2, err2) = be_run_cumulative_compaction(be_host, be_port, tablet_id) + log.info("Trigger compaction response: ${text2}") + + // Wait for compaction to finish + def running = true + def maxWait = 30 + while (running && maxWait > 0) { + Thread.sleep(1000) + def (code, out, err) = be_get_compaction_status(be_host, be_port, tablet_id) + def status = parseJson(out.trim()) + running = status.run_status + maxWait-- + } + assertFalse(running, "Compaction did not finish in time") + + // 3. Test: New profile record should appear + def (code3, text3, err3) = curl("GET", "${beHttpAddress}/api/compaction/profile?tablet_id=${tablet_id}") + assertEquals(0, code3) + def resp3 = parseJson(text3.trim()) + assertEquals("Success", resp3.status) + assertTrue(resp3.compaction_profiles.size() > 0, "Expected at least one profile record after compaction") + + def latestProfile = resp3.compaction_profiles[0] + log.info("Latest profile: ${latestProfile}") + + // Verify key fields exist and have reasonable values + assertNotNull(latestProfile.compaction_id) + assertNotNull(latestProfile.compaction_type) + assertEquals(Long.parseLong(tablet_id), latestProfile.tablet_id) + assertNotNull(latestProfile.start_time) + assertNotNull(latestProfile.end_time) + assertTrue(latestProfile.cost_time_ms >= 0) + assertTrue(latestProfile.success) + assertTrue(latestProfile.input_rowsets_count > 0) + assertTrue(latestProfile.input_row_num > 0) + assertTrue(latestProfile.input_rowsets_total_size > 0) + assertNotNull(latestProfile.output_version) + + // 4. Test: top_n parameter + def (code4, text4, err4) = curl("GET", "${beHttpAddress}/api/compaction/profile?top_n=1") + assertEquals(0, code4) + def resp4 = parseJson(text4.trim()) + assertTrue(resp4.compaction_profiles.size() <= 1) + + // 5. Test: invalid top_n returns 400 + def (code5, text5, err5) = curl("GET", "${beHttpAddress}/api/compaction/profile?top_n=-1") + assertTrue(text5.contains("top_n must be non-negative")) + + // 6. Test: non-existent tablet_id returns empty list + def (code6, text6, err6) = curl("GET", "${beHttpAddress}/api/compaction/profile?tablet_id=999999999") + assertEquals(0, code6) + def resp6 = parseJson(text6.trim()) + assertEquals(0, resp6.compaction_profiles.size()) + + sql """ DROP TABLE IF EXISTS ${tableName} """ +}