Skip to content

Commit

Permalink
pd_worker, split_controller: introduce the new config and CPU collect…
Browse files Browse the repository at this point in the history
…or registration mechanism (tikv#12942)

ref tikv#12063, ref tikv#12593

Introduce the new split config and CPU collector registration mechanism.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato authored and ekexium committed Jul 13, 2022
1 parent 5c17cf7 commit 5fd9234
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 14 deletions.
2 changes: 1 addition & 1 deletion components/raftstore/src/store/worker/mod.rs
Expand Up @@ -42,5 +42,5 @@ pub use self::{
Bucket, BucketRange, KeyEntry, Runner as SplitCheckRunner, Task as SplitCheckTask,
},
split_config::{SplitConfig, SplitConfigManager},
split_controller::{AutoSplitController, ReadStats, WriteStats},
split_controller::{AutoSplitController, ReadStats, SplitConfigChange, WriteStats},
};
65 changes: 55 additions & 10 deletions components/raftstore/src/store/worker/pd.rs
Expand Up @@ -58,7 +58,7 @@ use crate::store::{
worker::{
query_stats::QueryStats,
split_controller::{SplitInfo, TOP_N},
AutoSplitController, ReadStats, WriteStats,
AutoSplitController, ReadStats, SplitConfigChange, WriteStats,
},
Callback, CasualMessage, Config, PeerMsg, RaftCmdExtraOpts, RaftCommand, RaftRouter,
RegionReadProgressRegistry, SignificantMsg, SnapManager, StoreInfo, StoreMsg, TxnExt,
Expand Down Expand Up @@ -183,6 +183,7 @@ where
id: u64,
duration: RaftstoreDuration,
},
UpdateRegionCPUCollector(bool),
RegionCPURecords(Arc<RawRecords>),
ReportMinResolvedTS {
store_id: u64,
Expand Down Expand Up @@ -349,7 +350,7 @@ where
log_wrappers::Value::key(split_key),
),
Task::AutoSplit { ref split_infos } => {
write!(f, "auto split split regions, num is {}", split_infos.len(),)
write!(f, "auto split split regions, num is {}", split_infos.len())
}
Task::AskBatchSplit {
ref region,
Expand Down Expand Up @@ -405,6 +406,12 @@ where
Task::UpdateSlowScore { id, ref duration } => {
write!(f, "compute slow score: id {}, duration {:?}", id, duration)
}
Task::UpdateRegionCPUCollector(is_register) => {
if is_register {
return write!(f, "register region cpu collector");
}
write!(f, "deregister region cpu collector")
}
Task::RegionCPURecords(ref cpu_records) => {
write!(f, "get region cpu records: {:?}", cpu_records)
}
Expand Down Expand Up @@ -599,7 +606,18 @@ where
scheduler: &Scheduler<Task<EK, ER>>,
) {
let start_time = TiInstant::now();
auto_split_controller.refresh_cfg();
match auto_split_controller.refresh_and_check_cfg() {
SplitConfigChange::UpdateRegionCPUCollector(is_register) => {
if let Err(e) = scheduler.schedule(Task::UpdateRegionCPUCollector(is_register)) {
error!(
"failed to register or deregister the region cpu collector";
"is_register" => is_register,
"err" => ?e,
);
}
}
SplitConfigChange::Noop => {}
}
let mut others = vec![];
while let Ok(other) = receiver.try_recv() {
others.push(other);
Expand Down Expand Up @@ -842,7 +860,8 @@ where
scheduler: Scheduler<Task<EK, ER>>,
stats_monitor: StatsMonitor<EK, ER>,

_region_cpu_records_collector: CollectorGuard,
collector_reg_handle: CollectorRegHandle,
region_cpu_records_collector: Option<CollectorGuard>,
// region_id -> total_cpu_time_ms (since last region heartbeat)
region_cpu_records: HashMap<u64, u32>,

Expand Down Expand Up @@ -879,6 +898,18 @@ where
region_read_progress: RegionReadProgressRegistry,
health_service: Option<HealthService>,
) -> Runner<EK, ER, T> {
// Register the region CPU records collector.
let mut region_cpu_records_collector = None;
if auto_split_controller
.cfg
.region_cpu_overload_threshold_ratio
> 0.0
{
region_cpu_records_collector = Some(collector_reg_handle.register(
Box::new(RegionCPUMeteringCollector::new(scheduler.clone())),
false,
));
}
let interval = store_heartbeat_interval / Self::INTERVAL_DIVISOR;
let mut stats_monitor = StatsMonitor::new(
interval,
Expand All @@ -889,11 +920,6 @@ where
error!("failed to start stats collector, error = {:?}", e);
}

let _region_cpu_records_collector = collector_reg_handle.register(
Box::new(RegionCPUMeteringCollector::new(scheduler.clone())),
true,
);

Runner {
store_id,
pd_client,
Expand All @@ -905,7 +931,8 @@ where
start_ts: UnixSecs::now(),
scheduler,
stats_monitor,
_region_cpu_records_collector,
collector_reg_handle,
region_cpu_records_collector,
region_cpu_records: HashMap::default(),
concurrency_manager,
snap_mgr,
Expand Down Expand Up @@ -968,6 +995,21 @@ where
self.remote.spawn(f);
}

fn handle_update_region_cpu_collector(&mut self, is_register: bool) {
// If it's a deregister task, just take and drop the original collector.
if !is_register {
self.region_cpu_records_collector.take();
return;
}
if self.region_cpu_records_collector.is_some() {
return;
}
self.region_cpu_records_collector = Some(self.collector_reg_handle.register(
Box::new(RegionCPUMeteringCollector::new(self.scheduler.clone())),
false,
));
}

// Note: The parameter doesn't contain `self` because this function may
// be called in an asynchronous context.
fn handle_ask_batch_split(
Expand Down Expand Up @@ -1928,6 +1970,9 @@ where
} => self.handle_update_max_timestamp(region_id, initial_status, txn_ext),
Task::QueryRegionLeader { region_id } => self.handle_query_region_leader(region_id),
Task::UpdateSlowScore { id, duration } => self.slow_score.record(id, duration.sum()),
Task::UpdateRegionCPUCollector(is_register) => {
self.handle_update_region_cpu_collector(is_register)
}
Task::RegionCPURecords(records) => self.handle_region_cpu_records(records),
Task::ReportMinResolvedTS {
store_id,
Expand Down
32 changes: 32 additions & 0 deletions components/raftstore/src/store/worker/split_config.rs
Expand Up @@ -19,6 +19,19 @@ const DEFAULT_SPLIT_BALANCE_SCORE: f64 = 0.25;
// We get contained score by sample.contained/(sample.right+sample.left+sample.contained). It will be used to avoid to split regions requested by range.
const DEFAULT_SPLIT_CONTAINED_SCORE: f64 = 0.5;

// If the `split_balance_score` and `split_contained_score` above could not be satisfied, we will try to split the region according to its CPU load,
// then these parameters below will start to work.
// When the gRPC poll thread CPU usage is higher than gRPC poll thread count * `DEFAULT_GRPC_THREAD_CPU_OVERLOAD_THRESHOLD_RATIO`,
// the CPU-based split won't be triggered no matter if the `DEFAULT_UNIFIED_READ_POOL_THREAD_CPU_OVERLOAD_THRESHOLD_RATIO` and `REGION_CPU_OVERLOAD_THRESHOLD_RATIO` are exceeded
// to prevent from increasing the gRPC poll CPU usage.
const DEFAULT_GRPC_THREAD_CPU_OVERLOAD_THRESHOLD_RATIO: f64 = 0.5;
// When the Unified Read Poll thread CPU usage is higher than Unified Read Poll thread count * `DEFAULT_UNIFIED_READ_POOL_THREAD_CPU_OVERLOAD_THRESHOLD_RATIO`,
// the CPU-based split will try to check and record the top hot CPU region.
const DEFAULT_UNIFIED_READ_POOL_THREAD_CPU_OVERLOAD_THRESHOLD_RATIO: f64 = 0.8;
// When the Unified Read Poll is hot and the region's CPU usage reaches `REGION_CPU_OVERLOAD_THRESHOLD_RATIO` as a percentage of the Unified Read Poll,
// it will be added into the hot region list and may be split later as the top hot CPU region.
pub(crate) const REGION_CPU_OVERLOAD_THRESHOLD_RATIO: f64 = 0.25;

lazy_static! {
static ref SPLIT_CONFIG: Mutex<Option<Arc<VersionTrack<SplitConfig>>>> = Mutex::new(None);
}
Expand All @@ -43,6 +56,11 @@ pub struct SplitConfig {
pub sample_num: usize,
pub sample_threshold: u64,
pub byte_threshold: usize,
#[doc(hidden)]
pub grpc_thread_cpu_overload_threshold_ratio: f64,
#[doc(hidden)]
pub unified_read_pool_thread_cpu_overload_threshold_ratio: f64,
pub region_cpu_overload_threshold_ratio: f64,
// deprecated.
#[online_config(skip)]
#[doc(hidden)]
Expand All @@ -65,6 +83,11 @@ impl Default for SplitConfig {
sample_num: DEFAULT_SAMPLE_NUM,
sample_threshold: DEFAULT_SAMPLE_THRESHOLD,
byte_threshold: DEFAULT_BYTE_THRESHOLD,
grpc_thread_cpu_overload_threshold_ratio:
DEFAULT_GRPC_THREAD_CPU_OVERLOAD_THRESHOLD_RATIO,
unified_read_pool_thread_cpu_overload_threshold_ratio:
DEFAULT_UNIFIED_READ_POOL_THREAD_CPU_OVERLOAD_THRESHOLD_RATIO,
region_cpu_overload_threshold_ratio: REGION_CPU_OVERLOAD_THRESHOLD_RATIO,
size_threshold: None, // deprecated.
key_threshold: None, // deprecated.
}
Expand All @@ -87,6 +110,15 @@ impl SplitConfig {
("sample_num should be less than qps_threshold for load-base-split.").into(),
);
}
if self.grpc_thread_cpu_overload_threshold_ratio > 1.0
|| self.grpc_thread_cpu_overload_threshold_ratio < 0.0
|| self.unified_read_pool_thread_cpu_overload_threshold_ratio > 1.0
|| self.unified_read_pool_thread_cpu_overload_threshold_ratio < 0.0
|| self.region_cpu_overload_threshold_ratio > 1.0
|| self.region_cpu_overload_threshold_ratio < 0.0
{
return Err(("threshold ratio should be between 0 and 1.").into());
}
Ok(())
}
}
Expand Down
97 changes: 94 additions & 3 deletions components/raftstore/src/store/worker/split_controller.rs
Expand Up @@ -519,10 +519,16 @@ pub struct SplitInfo {
pub peer: Peer,
}

#[derive(PartialEq, Debug)]
pub enum SplitConfigChange {
Noop,
UpdateRegionCPUCollector(bool),
}

pub struct AutoSplitController {
// RegionID -> Recorder
pub recorders: HashMap<u64, Recorder>,
cfg: SplitConfig,
pub cfg: SplitConfig,
cfg_tracker: Tracker<SplitConfig>,
}

Expand Down Expand Up @@ -645,19 +651,36 @@ impl AutoSplitController {
});
}

pub fn refresh_cfg(&mut self) {
pub fn refresh_and_check_cfg(&mut self) -> SplitConfigChange {
let mut cfg_change = SplitConfigChange::Noop;
if let Some(incoming) = self.cfg_tracker.any_new() {
if self.cfg.region_cpu_overload_threshold_ratio <= 0.0
&& incoming.region_cpu_overload_threshold_ratio > 0.0
{
cfg_change = SplitConfigChange::UpdateRegionCPUCollector(true);
}
if self.cfg.region_cpu_overload_threshold_ratio > 0.0
&& incoming.region_cpu_overload_threshold_ratio <= 0.0
{
cfg_change = SplitConfigChange::UpdateRegionCPUCollector(false);
}
self.cfg = incoming.clone();
}
cfg_change
}
}

#[cfg(test)]
mod tests {
use online_config::{ConfigChange, ConfigManager, ConfigValue};
use tikv_util::config::VersionTrack;
use txn_types::Key;

use super::*;
use crate::store::{util::build_key_range, worker::split_config::DEFAULT_SAMPLE_NUM};
use crate::store::{
util::build_key_range,
worker::split_config::{DEFAULT_SAMPLE_NUM, REGION_CPU_OVERLOAD_THRESHOLD_RATIO},
};

enum Position {
Left,
Expand Down Expand Up @@ -1201,6 +1224,74 @@ mod tests {
qps_stats
}

#[test]
fn test_refresh_and_check_cfg() {
let split_config = SplitConfig::default();
let mut split_cfg_manager =
SplitConfigManager::new(Arc::new(VersionTrack::new(split_config)));
let mut auto_split_controller = AutoSplitController::new(split_cfg_manager.clone());
assert_eq!(
auto_split_controller.refresh_and_check_cfg(),
SplitConfigChange::Noop,
);
assert_eq!(
auto_split_controller
.cfg
.region_cpu_overload_threshold_ratio,
REGION_CPU_OVERLOAD_THRESHOLD_RATIO
);
// Set to zero.
dispatch_split_cfg_change(
&mut split_cfg_manager,
"region_cpu_overload_threshold_ratio",
ConfigValue::F64(0.0),
);
assert_eq!(
auto_split_controller.refresh_and_check_cfg(),
SplitConfigChange::UpdateRegionCPUCollector(false),
);
assert_eq!(
auto_split_controller
.cfg
.region_cpu_overload_threshold_ratio,
0.0
);
assert_eq!(
auto_split_controller.refresh_and_check_cfg(),
SplitConfigChange::Noop,
);
// Set to non-zero.
dispatch_split_cfg_change(
&mut split_cfg_manager,
"region_cpu_overload_threshold_ratio",
ConfigValue::F64(REGION_CPU_OVERLOAD_THRESHOLD_RATIO),
);
assert_eq!(
auto_split_controller.refresh_and_check_cfg(),
SplitConfigChange::UpdateRegionCPUCollector(true),
);
assert_eq!(
auto_split_controller
.cfg
.region_cpu_overload_threshold_ratio,
REGION_CPU_OVERLOAD_THRESHOLD_RATIO
);
assert_eq!(
auto_split_controller.refresh_and_check_cfg(),
SplitConfigChange::Noop,
);
}

fn dispatch_split_cfg_change(
split_cfg_manager: &mut SplitConfigManager,
cfg_name: &str,
cfg_value: ConfigValue,
) {
let mut config_change = ConfigChange::new();
config_change.insert(String::from(cfg_name), cfg_value);
split_cfg_manager.dispatch(config_change).unwrap();
}

#[bench]
fn samples_evaluate(b: &mut test::Bencher) {
let mut samples = Samples(vec![Sample::new(b"c")]);
Expand Down

0 comments on commit 5fd9234

Please sign in to comment.