Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/cloud/cloud_base_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
#pragma once

#include <memory>
#include <optional>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "storage/compaction/compaction.h"
#include "storage/compaction_task_tracker.h"

namespace doris {

Expand All @@ -34,6 +36,10 @@ class CloudBaseCompaction : public CloudCompactionMixin {
Status execute_compact() override;
Status request_global_lock();

std::optional<CompactionProfileType> profile_type() const override {
return CompactionProfileType::BASE;
}

void do_lease();

private:
Expand Down
12 changes: 7 additions & 5 deletions be/src/cloud/cloud_compaction_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "storage/compaction/cumulative_compaction_policy.h"
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
#include "storage/compaction/full_compaction.h"
#include "storage/compaction_task_tracker.h"
#include "storage/olap_define.h"
#include "storage/storage_engine.h"
#include "storage/tablet/tablet_manager.h"
Expand Down Expand Up @@ -166,12 +167,13 @@ Status CloudCompactionAction::_handle_run_compaction(HttpRequest* req, std::stri

LOG(INFO) << "manual submit compaction task, tablet id: " << tablet_id
<< " table id: " << table_id;
// 3. submit compaction task
// 3. submit compaction task (trigger_method=1 for MANUAL)
RETURN_IF_ERROR(_engine.submit_compaction_task(
tablet, compaction_type == PARAM_COMPACTION_BASE ? CompactionType::BASE_COMPACTION
: compaction_type == PARAM_COMPACTION_CUMULATIVE
? CompactionType::CUMULATIVE_COMPACTION
: CompactionType::FULL_COMPACTION));
tablet,
compaction_type == PARAM_COMPACTION_BASE ? CompactionType::BASE_COMPACTION
: compaction_type == PARAM_COMPACTION_CUMULATIVE ? CompactionType::CUMULATIVE_COMPACTION
: CompactionType::FULL_COMPACTION,
/*trigger_method=*/1));

LOG(INFO) << "Manual compaction task is successfully triggered, tablet id: " << tablet_id
<< " table id: " << table_id;
Expand Down
6 changes: 6 additions & 0 deletions be/src/cloud/cloud_cumulative_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
#pragma once

#include <memory>
#include <optional>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "storage/compaction/compaction.h"
#include "storage/compaction_task_tracker.h"

namespace doris {
#include "common/compile_check_begin.h"
Expand All @@ -36,6 +38,10 @@ class CloudCumulativeCompaction : public CloudCompactionMixin {
Status execute_compact() override;
Status request_global_lock();

std::optional<CompactionProfileType> profile_type() const override {
return CompactionProfileType::CUMULATIVE;
}

void do_lease();

int64_t get_input_rowsets_bytes() const { return _input_rowsets_total_size; }
Expand Down
6 changes: 6 additions & 0 deletions be/src/cloud/cloud_full_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
#pragma once

#include <memory>
#include <optional>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "storage/compaction/compaction.h"
#include "storage/compaction_task_tracker.h"

namespace doris {

Expand All @@ -35,6 +37,10 @@ class CloudFullCompaction : public CloudCompactionMixin {
Status execute_compact() override;
Status request_global_lock();

std::optional<CompactionProfileType> profile_type() const override {
return CompactionProfileType::FULL;
}

void do_lease();

protected:
Expand Down
111 changes: 104 additions & 7 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "runtime/memory/cache_manager.h"
#include "storage/compaction/cumulative_compaction_policy.h"
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
#include "storage/compaction_task_tracker.h"
#include "storage/storage_policy.h"
#include "util/parse_util.h"
#include "util/time.h"
Expand Down Expand Up @@ -796,7 +797,8 @@ Status CloudStorageEngine::_request_tablet_global_compaction_lock(
}
}

Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet) {
Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet,
int trigger_method) {
using namespace std::chrono;
{
std::lock_guard lock(_compaction_mtx);
Expand All @@ -819,6 +821,26 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t
_submitted_base_compactions.erase(tablet->tablet_id());
return st;
}
// Register task with CompactionTaskTracker as PENDING
auto* tracker = CompactionTaskTracker::instance();
int64_t compaction_id = compaction->compaction_id();
{
CompactionTaskInfo info;
info.compaction_id = compaction_id;
info.tablet_id = tablet->tablet_id();
info.table_id = tablet->table_id();
info.partition_id = tablet->partition_id();
info.compaction_type = CompactionProfileType::BASE;
info.status = CompactionTaskStatus::PENDING;
info.trigger_method = static_cast<TriggerMethod>(trigger_method);
info.scheduled_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
info.input_rowsets_count = compaction->input_rowsets_count();
info.input_row_num = compaction->input_row_num_value();
info.input_data_size = compaction->input_rowsets_data_size();
info.input_segments_num = compaction->input_segments_num_value();
tracker->register_task(std::move(info));
}
{
std::lock_guard lock(_compaction_mtx);
_submitted_base_compactions[tablet->tablet_id()] = compaction;
Expand All @@ -830,6 +852,8 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t
g_base_compaction_running_task_count << 1;
signal::tablet_id = tablet->tablet_id();
Defer defer {[&]() {
// Idempotent cleanup: remove task from tracker
CompactionTaskTracker::instance()->remove_task(compaction_id);
g_base_compaction_running_task_count << -1;
std::lock_guard lock(_compaction_mtx);
_submitted_base_compactions.erase(tablet->tablet_id());
Expand All @@ -840,6 +864,13 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t
auto st = _request_tablet_global_compaction_lock(ReaderType::READER_BASE_COMPACTION, tablet,
compaction);
if (!st.ok()) return;
// Update tracker to RUNNING after acquiring global lock
{
RunningStats rs;
rs.start_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
CompactionTaskTracker::instance()->update_to_running(compaction_id, rs);
}
st = compaction->execute_compact();
if (!st.ok()) {
// Error log has been output in `execute_compact`
Expand All @@ -852,6 +883,7 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t
DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
_base_compaction_thread_pool->get_queue_size());
if (!st.ok()) {
tracker->remove_task(compaction_id);
std::lock_guard lock(_compaction_mtx);
_submitted_base_compactions.erase(tablet->tablet_id());
return Status::InternalError("failed to submit base compaction, tablet_id={}",
Expand All @@ -860,7 +892,8 @@ Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t
return st;
}

Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet) {
Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet,
int trigger_method) {
using namespace std::chrono;
{
std::lock_guard lock(_compaction_mtx);
Expand Down Expand Up @@ -893,6 +926,28 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
_tablet_preparing_cumu_compaction.erase(tablet->tablet_id());
return st;
}
// Register task with CompactionTaskTracker as PENDING
// IMPORTANT: use compaction->compaction_id(), NOT tracker->next_compaction_id(),
// because the Compaction constructor already allocated an ID via the tracker.
auto* tracker = CompactionTaskTracker::instance();
int64_t compaction_id = compaction->compaction_id();
{
CompactionTaskInfo info;
info.compaction_id = compaction_id;
info.tablet_id = tablet->tablet_id();
info.table_id = tablet->table_id();
info.partition_id = tablet->partition_id();
info.compaction_type = CompactionProfileType::CUMULATIVE;
info.status = CompactionTaskStatus::PENDING;
info.trigger_method = static_cast<TriggerMethod>(trigger_method);
info.scheduled_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
info.input_rowsets_count = compaction->input_rowsets_count();
info.input_row_num = compaction->input_row_num_value();
info.input_data_size = compaction->input_rowsets_data_size();
info.input_segments_num = compaction->input_segments_num_value();
tracker->register_task(std::move(info));
}
{
std::lock_guard lock(_compaction_mtx);
_tablet_preparing_cumu_compaction.erase(tablet->tablet_id());
Expand Down Expand Up @@ -942,6 +997,8 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
Defer defer {[&]() {
DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.sleep",
{ sleep(5); })
// Idempotent cleanup: remove task from tracker
CompactionTaskTracker::instance()->remove_task(compaction_id);
std::lock_guard lock(_cumu_compaction_delay_mtx);
_cumu_compaction_thread_pool_used_threads--;
if (!is_large_task) {
Expand All @@ -956,6 +1013,13 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
auto st = _request_tablet_global_compaction_lock(ReaderType::READER_CUMULATIVE_COMPACTION,
tablet, compaction);
if (!st.ok()) return;
// Update tracker to RUNNING after acquiring global lock
{
RunningStats rs;
rs.start_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
CompactionTaskTracker::instance()->update_to_running(compaction_id, rs);
}
do {
std::lock_guard lock(_cumu_compaction_delay_mtx);
_cumu_compaction_thread_pool_used_threads++;
Expand Down Expand Up @@ -1007,14 +1071,16 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
_cumu_compaction_thread_pool->get_queue_size());
if (!st.ok()) {
tracker->remove_task(compaction_id);
erase_submitted_cumu_compaction();
return Status::InternalError("failed to submit cumu compaction, tablet_id={}",
tablet->tablet_id());
}
return st;
}

Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet) {
Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet,
int trigger_method) {
using namespace std::chrono;
{
std::lock_guard lock(_compaction_mtx);
Expand All @@ -1036,6 +1102,26 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t
_submitted_full_compactions.erase(tablet->tablet_id());
return st;
}
// Register task with CompactionTaskTracker as PENDING
auto* tracker = CompactionTaskTracker::instance();
int64_t compaction_id = compaction->compaction_id();
{
CompactionTaskInfo info;
info.compaction_id = compaction_id;
info.tablet_id = tablet->tablet_id();
info.table_id = tablet->table_id();
info.partition_id = tablet->partition_id();
info.compaction_type = CompactionProfileType::FULL;
info.status = CompactionTaskStatus::PENDING;
info.trigger_method = static_cast<TriggerMethod>(trigger_method);
info.scheduled_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
info.input_rowsets_count = compaction->input_rowsets_count();
info.input_row_num = compaction->input_row_num_value();
info.input_data_size = compaction->input_rowsets_data_size();
info.input_segments_num = compaction->input_segments_num_value();
tracker->register_task(std::move(info));
}
{
std::lock_guard lock(_compaction_mtx);
_submitted_full_compactions[tablet->tablet_id()] = compaction;
Expand All @@ -1044,13 +1130,22 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t
g_full_compaction_running_task_count << 1;
signal::tablet_id = tablet->tablet_id();
Defer defer {[&]() {
// Idempotent cleanup: remove task from tracker
CompactionTaskTracker::instance()->remove_task(compaction_id);
g_full_compaction_running_task_count << -1;
std::lock_guard lock(_compaction_mtx);
_submitted_full_compactions.erase(tablet->tablet_id());
}};
auto st = _request_tablet_global_compaction_lock(ReaderType::READER_FULL_COMPACTION, tablet,
compaction);
if (!st.ok()) return;
// Update tracker to RUNNING after acquiring global lock
{
RunningStats rs;
rs.start_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
CompactionTaskTracker::instance()->update_to_running(compaction_id, rs);
}
st = compaction->execute_compact();
if (!st.ok()) {
// Error log has been output in `execute_compact`
Expand All @@ -1061,6 +1156,7 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t
_executing_full_compactions.erase(tablet->tablet_id());
});
if (!st.ok()) {
tracker->remove_task(compaction_id);
std::lock_guard lock(_compaction_mtx);
_submitted_full_compactions.erase(tablet->tablet_id());
return Status::InternalError("failed to submit full compaction, tablet_id={}",
Expand All @@ -1070,19 +1166,20 @@ Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& t
}

Status CloudStorageEngine::submit_compaction_task(const CloudTabletSPtr& tablet,
CompactionType compaction_type) {
CompactionType compaction_type,
int trigger_method) {
DCHECK(compaction_type == CompactionType::CUMULATIVE_COMPACTION ||
compaction_type == CompactionType::BASE_COMPACTION ||
compaction_type == CompactionType::FULL_COMPACTION);
switch (compaction_type) {
case CompactionType::BASE_COMPACTION:
RETURN_IF_ERROR(_submit_base_compaction_task(tablet));
RETURN_IF_ERROR(_submit_base_compaction_task(tablet, trigger_method));
return Status::OK();
case CompactionType::CUMULATIVE_COMPACTION:
RETURN_IF_ERROR(_submit_cumulative_compaction_task(tablet));
RETURN_IF_ERROR(_submit_cumulative_compaction_task(tablet, trigger_method));
return Status::OK();
case CompactionType::FULL_COMPACTION:
RETURN_IF_ERROR(_submit_full_compaction_task(tablet));
RETURN_IF_ERROR(_submit_full_compaction_task(tablet, trigger_method));
return Status::OK();
default:
return Status::InternalError("unknown compaction type!");
Expand Down
10 changes: 6 additions & 4 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ class CloudStorageEngine final : public BaseStorageEngine {
void get_cumu_compaction(int64_t tablet_id,
std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res);

Status submit_compaction_task(const CloudTabletSPtr& tablet, CompactionType compaction_type);
Status submit_compaction_task(const CloudTabletSPtr& tablet, CompactionType compaction_type,
int trigger_method = 0);

Status get_compaction_status_json(std::string* result);

Expand Down Expand Up @@ -202,9 +203,10 @@ class CloudStorageEngine final : public BaseStorageEngine {
std::vector<CloudTabletSPtr> _generate_cloud_compaction_tasks(CompactionType compaction_type,
bool check_score);
Status _adjust_compaction_thread_num();
Status _submit_base_compaction_task(const CloudTabletSPtr& tablet);
Status _submit_cumulative_compaction_task(const CloudTabletSPtr& tablet);
Status _submit_full_compaction_task(const CloudTabletSPtr& tablet);
Status _submit_base_compaction_task(const CloudTabletSPtr& tablet, int trigger_method = 0);
Status _submit_cumulative_compaction_task(const CloudTabletSPtr& tablet,
int trigger_method = 0);
Status _submit_full_compaction_task(const CloudTabletSPtr& tablet, int trigger_method = 0);
Status _request_tablet_global_compaction_lock(ReaderType compaction_type,
const CloudTabletSPtr& tablet,
std::shared_ptr<CloudCompactionMixin> compaction);
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,9 @@ DEFINE_mInt32(base_compaction_trace_threshold, "60");
DEFINE_mInt32(cumulative_compaction_trace_threshold, "10");
DEFINE_mBool(disable_compaction_trace_log, "true");

DEFINE_mBool(enable_compaction_task_tracker, "true");
DEFINE_mInt32(compaction_task_tracker_max_records, "10000");

// Interval to picking rowset to compact, in seconds
DEFINE_mInt64(pick_rowset_to_compact_interval_sec, "86400");

Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,9 @@ DECLARE_mInt32(base_compaction_trace_threshold);
DECLARE_mInt32(cumulative_compaction_trace_threshold);
DECLARE_mBool(disable_compaction_trace_log);

DECLARE_mBool(enable_compaction_task_tracker);
DECLARE_mInt32(compaction_task_tracker_max_records);

// Interval to picking rowset to compact, in seconds
DECLARE_mInt64(pick_rowset_to_compact_interval_sec);

Expand Down
Loading
Loading