Skip to content

Commit

Permalink
chore(storage): remove compactor workload (risingwavelabs#12005)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Sep 6, 2023
1 parent a76e519 commit 61ef35b
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 37 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,6 @@ message CompactTaskProgress {
uint64 num_pending_write_io = 6;
}

// The measurement of the workload on a compactor to determine whether it is idle.
message CompactorWorkload {
uint32 cpu = 1;
}

message SubscribeCompactionEventRequest {
// Register provides the context_id of the corresponding Compactor.
message Register {
Expand Down
1 change: 0 additions & 1 deletion src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ scopeguard = "1"
sled = "0.34.7"
spin = "0.9"
sync-point = { path = "../utils/sync-point" }
sysinfo = { version = "0.29", default-features = false }
tempfile = "3"
thiserror = "1"
# tikv-client = { git = "https://github.com/tikv/client-rust", rev = "5714b2", optional = true }
Expand Down
31 changes: 1 addition & 30 deletions src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub(super) mod task_progress;

use std::collections::HashMap;
use std::marker::PhantomData;
use std::ops::Div;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
Expand All @@ -47,12 +46,10 @@ use risingwave_pb::hummock::subscribe_compaction_event_request::{
};
use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
use risingwave_pb::hummock::{
CompactTaskProgress, CompactorWorkload, SubscribeCompactionEventRequest,
SubscribeCompactionEventResponse,
CompactTaskProgress, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse,
};
use risingwave_rpc_client::HummockMetaClient;
pub use shared_buffer_compact::{compact, merge_imms_in_memory};
use sysinfo::{CpuRefreshKind, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt};
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
use tokio::time::Instant;
Expand Down Expand Up @@ -321,9 +318,6 @@ pub fn start_compactor(
let task_progress = compactor_context.task_progress_manager.clone();
let periodic_event_update_interval = Duration::from_millis(1000);
let cpu_core_num = compactor_context.compaction_executor.worker_num() as u32;
let mut system =
System::new_with_specifics(RefreshKind::new().with_cpu(CpuRefreshKind::everything()));
let pid = sysinfo::get_current_pid().unwrap();
let running_task_count = compactor_context.running_task_count.clone();
let pull_task_ack = Arc::new(AtomicBool::new(true));

Expand All @@ -339,7 +333,6 @@ pub fn start_compactor(
let shutdown_map = CompactionShutdownMap::default();
let mut min_interval = tokio::time::interval(stream_retry_interval);
let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
let mut workload_collect_interval = tokio::time::interval(Duration::from_secs(60));

// This outer loop is to recreate stream.
'start_stream: loop {
Expand Down Expand Up @@ -375,7 +368,6 @@ pub fn start_compactor(

let executor = compactor_context.compaction_executor.clone();
let sstable_object_id_manager = sstable_object_id_manager.clone();
let mut last_workload = CompactorWorkload::default();

// This inner loop is to consume stream or report task progress.
let mut event_loop_iteration_now = Instant::now();
Expand Down Expand Up @@ -454,7 +446,6 @@ pub fn start_compactor(
}

tracing::info!(
cpu = %last_workload.cpu,
running_task_count = %running_task_count.load(Ordering::Relaxed),
pull_task_ack = %pull_task_ack.load(Ordering::Relaxed),
pending_pull_task_count = %pending_pull_task_count
Expand All @@ -463,26 +454,6 @@ pub fn start_compactor(
continue;
}

_ = workload_collect_interval.tick() => {
let refresh_result = system.refresh_process_specifics(pid, ProcessRefreshKind::new().with_cpu());
debug_assert!(refresh_result);
let cpu = if let Some(process) = system.process(pid) {
process.cpu_usage().div(cpu_core_num as f32) as u32
} else {
tracing::warn!("fail to get process pid {:?}", pid);
0
};

tracing::debug!("compactor cpu usage {cpu}");
let workload = CompactorWorkload {
cpu,
};

last_workload = workload.clone();

continue;
}

event = response_event_stream.next() => {
event
}
Expand Down

0 comments on commit 61ef35b

Please sign in to comment.