Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_
_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds,
[&master_info = _master_info] { report_task_callback(master_info); }));

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_DISK_STATE", _master_info, config::report_disk_state_interval_seconds,
[&engine, &master_info = _master_info] { report_disk_callback(engine, master_info); }));
}

// TODO(lingbin): each task in the batch may have it own status or FE must check and
Expand Down
25 changes: 25 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,31 @@ void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info)
}
}

void report_disk_callback(CloudStorageEngine& engine, const TMasterInfo& master_info) {
// Random sleep 1~5 seconds before doing report.
// In order to avoid the problem that the FE receives many report requests at the same time
// and can not be processed.
if (config::report_random_wait) {
random_sleep(5);
}
(void)engine; // To be used in the future

TReportRequest request;
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.disks = true;

// TODO(deardeng): report disk info in cloud mode. And make it more clear
// that report CPU by using a separte report procedure
// or abstracting disk report as "host info report"
request.__set_num_cores(CpuInfo::num_cores());
request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
? config::pipeline_executor_size
: CpuInfo::num_cores());
bool succ = handle_report(request, master_info, "disk");
report_disk_total << 1;
report_disk_failed << !succ;
}

void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_info) {
if (config::report_random_wait) {
random_sleep(5);
Expand Down
2 changes: 2 additions & 0 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ void report_task_callback(const TMasterInfo& master_info);

void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info);

void report_disk_callback(CloudStorageEngine& engine, const TMasterInfo& master_info);

void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_info);

void calc_delete_bimtap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req);
Expand Down