Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gc: config gc io limit dynamically (#5769) #5957

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/bin/tikv-ctl.rs
Expand Up @@ -40,6 +40,8 @@ use tikv::config::TiKvConfig;
use tikv::pd::{Config as PdConfig, PdClient, RpcClient};
use tikv::raftstore::store::{keys, INIT_EPOCH_CONF_VER};
use tikv::server::debug::{BottommostLevelCompaction, Debugger, RegionInfo};
use tikv::server::ServerRaftStoreRouter;
use tikv::storage::kv::{raftkv::RaftKv, Engine};
use tikv::storage::Key;
use tikv_util::security::{SecurityConfig, SecurityManager};
use tikv_util::{escape, unescape};
Expand Down Expand Up @@ -88,11 +90,10 @@ fn new_debug_executor(
let raft_db =
rocks::util::new_engine_opt(&raft_path, raft_db_opts, raft_db_cf_opts).unwrap();

Box::new(Debugger::new(Engines::new(
Arc::new(kv_db),
Arc::new(raft_db),
cache.is_some(),
))) as Box<dyn DebugExecutor>
Box::new(Debugger::<RaftKv<ServerRaftStoreRouter>>::new(
Engines::new(Arc::new(kv_db), Arc::new(raft_db), cache.is_some()),
None,
)) as Box<dyn DebugExecutor>
}
(Some(remote), None) => Box::new(new_debug_client(remote, mgr)) as Box<dyn DebugExecutor>,
_ => unreachable!(),
Expand Down Expand Up @@ -746,7 +747,7 @@ impl DebugExecutor for DebugClient {
}
}

impl DebugExecutor for Debugger {
impl<E: Engine> DebugExecutor for Debugger<E> {
fn check_local_mode(&self) {}

fn get_all_meta_regions(&self) -> Vec<u64> {
Expand Down
54 changes: 47 additions & 7 deletions src/server/debug.rs
Expand Up @@ -32,8 +32,10 @@ use crate::raftstore::store::{
write_peer_state,
};
use crate::raftstore::store::{keys, PeerStorage};
use crate::storage::gc_worker::GCWorker;
use crate::storage::mvcc::{Lock, LockType, Write, WriteType};
use crate::storage::types::Key;
use crate::storage::Engine;
use crate::storage::Iterator as EngineIterator;
use tikv_util::codec::bytes;
use tikv_util::collections::HashSet;
Expand All @@ -42,6 +44,8 @@ use tikv_util::escape;
use tikv_util::keybuilder::KeyBuilder;
use tikv_util::worker::Worker;

const GC_IO_LIMITER_CONFIG_NAME: &str = "gc.max_write_bytes_per_sec";

pub type Result<T> = result::Result<T, Error>;
type DBIterator = RocksIterator<Arc<DB>>;

Expand Down Expand Up @@ -128,13 +132,14 @@ impl From<BottommostLevelCompaction> for debugpb::BottommostLevelCompaction {
}

#[derive(Clone)]
pub struct Debugger {
pub struct Debugger<E: Engine> {
engines: Engines,
gc_worker: Option<GCWorker<E>>,
}

impl Debugger {
pub fn new(engines: Engines) -> Debugger {
Debugger { engines }
impl<E: Engine> Debugger<E> {
pub fn new(engines: Engines, gc_worker: Option<GCWorker<E>>) -> Debugger<E> {
Debugger { engines, gc_worker }
}

pub fn get_engine(&self) -> &Engines {
Expand Down Expand Up @@ -793,6 +798,22 @@ impl Debugger {
}
Ok(())
}
MODULE::SERVER => {
if config_name == GC_IO_LIMITER_CONFIG_NAME {
if let Ok(bytes_per_sec) = ReadableSize::from_str(config_value) {
return self
.gc_worker
.as_ref()
.expect("must be some")
.change_io_limit(bytes_per_sec.0)
.map_err(|e| Error::Other(e.into()));
}
}
Err(Error::InvalidArgument(format!(
"bad argument: {} {}",
config_name, config_value
)))
}
_ => Err(Error::NotFound(format!("unsupported module: {:?}", module))),
}
}
Expand Down Expand Up @@ -1483,7 +1504,9 @@ mod tests {
use tempdir::TempDir;

use super::*;
use crate::storage::gc_worker::GCConfig;
use crate::storage::mvcc::{Lock, LockType};
use crate::storage::{RocksEngine as TestEngine, TestEngineBuilder};
use engine::rocks;
use engine::rocks::util::{new_engine_opt, CFOptions};
use engine::Mutable;
Expand Down Expand Up @@ -1582,7 +1605,7 @@ mod tests {
}
}

fn new_debugger() -> Debugger {
fn new_debugger() -> Debugger<TestEngine> {
let tmp = TempDir::new("test_debug").unwrap();
let path = tmp.path().to_str().unwrap();
let engine = Arc::new(
Expand All @@ -1601,10 +1624,13 @@ mod tests {

let shared_block_cache = false;
let engines = Engines::new(Arc::clone(&engine), engine, shared_block_cache);
Debugger::new(engines)
let test_engine = TestEngineBuilder::new().build().unwrap();
let mut gc_worker = GCWorker::new(test_engine, None, None, GCConfig::default());
gc_worker.start().unwrap();
Debugger::new(engines, Some(gc_worker))
}

impl Debugger {
impl Debugger<TestEngine> {
fn set_store_id(&self, store_id: u64) {
let mut ident = StoreIdent::new();
ident.set_store_id(store_id);
Expand Down Expand Up @@ -2265,4 +2291,18 @@ mod tests {
&keys[1..9],
);
}

#[test]
fn test_modify_gc_io_limit() {
let debugger = new_debugger();
debugger
.modify_tikv_config(MODULE::SERVER, "gc", "10MB")
.unwrap_err();
debugger
.modify_tikv_config(MODULE::STORAGE, GC_IO_LIMITER_CONFIG_NAME, "10MB")
.unwrap_err();
debugger
.modify_tikv_config(MODULE::SERVER, GC_IO_LIMITER_CONFIG_NAME, "10MB")
.unwrap();
}
}
3 changes: 2 additions & 1 deletion src/server/server.rs
Expand Up @@ -97,6 +97,7 @@ impl<T: RaftStoreRouter, S: StoreAddrResolver + 'static> Server<T, S> {
);
let snap_worker = Worker::new("snap-handler");

let gc_worker = storage.gc_worker.clone();
let kv_service = KvService::new(
storage,
cop,
Expand All @@ -123,7 +124,7 @@ impl<T: RaftStoreRouter, S: StoreAddrResolver + 'static> Server<T, S> {
.register_service(create_tikv(kv_service));
sb = security_mgr.bind(sb, &ip, addr.port());
if let Some(engines) = debug_engines {
let debug_service = DebugService::new(engines, raft_router.clone());
let debug_service = DebugService::new(engines, raft_router.clone(), gc_worker);
sb = sb.register_service(create_debug(debug_service));
}
if let Some(service) = import_service {
Expand Down
16 changes: 9 additions & 7 deletions src/server/service/debug.rs
Expand Up @@ -19,6 +19,8 @@ use protobuf::text_format::print_to_string;
use crate::raftstore::store::msg::Callback;
use crate::server::debug::{Debugger, Error};
use crate::server::transport::RaftStoreRouter;
use crate::storage::gc_worker::GCWorker;
use crate::storage::kv::Engine;
use tikv_util::metrics;

use tikv_alloc;
Expand All @@ -45,20 +47,20 @@ fn error_to_grpc_error(tag: &'static str, e: Error) -> GrpcError {

/// Service handles the RPC messages for the `Debug` service.
#[derive(Clone)]
pub struct Service<T: RaftStoreRouter> {
pub struct Service<T: RaftStoreRouter, E: Engine> {
pool: CpuPool,
debugger: Debugger,
debugger: Debugger<E>,
raft_router: T,
}

impl<T: RaftStoreRouter> Service<T> {
/// Constructs a new `Service` with `Engines` and a `RaftStoreRouter`.
pub fn new(engines: Engines, raft_router: T) -> Service<T> {
impl<T: RaftStoreRouter, E: Engine> Service<T, E> {
/// Constructs a new `Service` with `Engines`, a `RaftStoreRouter` and a `GCWorker`.
pub fn new(engines: Engines, raft_router: T, gc_worker: GCWorker<E>) -> Service<T, E> {
let pool = Builder::new()
.name_prefix(thd_name!("debugger"))
.pool_size(1)
.create();
let debugger = Debugger::new(engines);
let debugger = Debugger::new(engines, Some(gc_worker));
Service {
pool,
debugger,
Expand All @@ -84,7 +86,7 @@ impl<T: RaftStoreRouter> Service<T> {
}
}

impl<T: RaftStoreRouter + 'static> debugpb_grpc::Debug for Service<T> {
impl<T: RaftStoreRouter + 'static, E: Engine + 'static> debugpb_grpc::Debug for Service<T, E> {
fn get(&mut self, ctx: RpcContext<'_>, mut req: GetRequest, sink: UnarySink<GetResponse>) {
const TAG: &str = "debug_get";

Expand Down
79 changes: 67 additions & 12 deletions src/storage/gc_worker.rs
Expand Up @@ -166,7 +166,7 @@ struct GCRunner<E: Engine> {
raft_store_router: Option<ServerRaftStoreRouter>,

/// Used to limit the write flow of GC.
limiter: Option<IOLimiter>,
limiter: Arc<Mutex<Option<IOLimiter>>>,

cfg: GCConfig,

Expand All @@ -178,13 +178,9 @@ impl<E: Engine> GCRunner<E> {
engine: E,
local_storage: Option<Arc<DB>>,
raft_store_router: Option<ServerRaftStoreRouter>,
limiter: Arc<Mutex<Option<IOLimiter>>>,
cfg: GCConfig,
) -> Self {
let limiter = if cfg.max_write_bytes_per_sec.0 > 0 {
Some(IOLimiter::new(cfg.max_write_bytes_per_sec.0))
} else {
None
};
Self {
engine,
local_storage,
Expand Down Expand Up @@ -296,7 +292,7 @@ impl<E: Engine> GCRunner<E> {
let write_size = txn.write_size();
let modifies = txn.into_modifies();
if !modifies.is_empty() {
if let Some(limiter) = &self.limiter {
if let Some(limiter) = &*self.limiter.lock().unwrap() {
limiter.request(write_size as i64);
}
self.engine.write(ctx, modifies)?;
Expand Down Expand Up @@ -1105,6 +1101,7 @@ pub struct GCWorker<E: Engine> {
raft_store_router: Option<ServerRaftStoreRouter>,

cfg: Option<GCConfig>,
limiter: Arc<Mutex<Option<IOLimiter>>>,

worker: Arc<Mutex<Worker<GCTask>>>,
worker_scheduler: worker::Scheduler<GCTask>,
Expand All @@ -1125,11 +1122,17 @@ impl<E: Engine> GCWorker<E> {
.create(),
));
let worker_scheduler = worker.lock().unwrap().scheduler();
let limiter = if cfg.max_write_bytes_per_sec.0 > 0 {
Some(IOLimiter::new(cfg.max_write_bytes_per_sec.0))
} else {
None
};
GCWorker {
engine,
local_storage,
raft_store_router,
cfg: Some(cfg),
limiter: Arc::new(Mutex::new(limiter)),
worker,
worker_scheduler,
gc_manager_handle: Arc::new(Mutex::new(None)),
Expand All @@ -1152,6 +1155,7 @@ impl<E: Engine> GCWorker<E> {
self.engine.clone(),
self.local_storage.take(),
self.raft_store_router.take(),
self.limiter.clone(),
self.cfg.take().unwrap(),
);
self.worker
Expand All @@ -1167,12 +1171,12 @@ impl<E: Engine> GCWorker<E> {
h.stop()?;
}
// Stop self.
let h = self.worker.lock().unwrap().stop().unwrap();
if let Err(e) = h.join() {
Err(box_err!("failed to join gc_worker handle, err: {:?}", e))
} else {
Ok(())
if let Some(h) = self.worker.lock().unwrap().stop() {
if let Err(e) = h.join() {
return Err(box_err!("failed to join gc_worker handle, err: {:?}", e));
}
}
Ok(())
}

pub fn async_gc(&self, ctx: Context, safe_point: u64, callback: Callback<()>) -> Result<()> {
Expand Down Expand Up @@ -1206,6 +1210,19 @@ impl<E: Engine> GCWorker<E> {
})
.or_else(handle_gc_task_schedule_error)
}

pub fn change_io_limit(&self, limit: u64) -> Result<()> {
let mut limiter = self.limiter.lock().unwrap();
if limit == 0 {
limiter.take();
} else {
limiter
.get_or_insert_with(|| IOLimiter::new(limit))
.set_bytes_per_second(limit as i64);
}
info!("GC io limit changed"; "max_write_bytes_per_sec" => limit);
Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1734,4 +1751,42 @@ mod tests {
invalid_cfg.batch_keys = 0;
assert!(invalid_cfg.validate().is_err());
}

#[test]
fn test_change_io_limit() {
let engine = TestEngineBuilder::new().build().unwrap();
let mut gc_worker = GCWorker::new(engine, None, None, GCConfig::default());
gc_worker.start().unwrap();
assert!(gc_worker.limiter.lock().unwrap().is_none());

// Enable io iolimit
gc_worker.change_io_limit(1024).unwrap();
assert_eq!(
gc_worker
.limiter
.lock()
.unwrap()
.as_ref()
.unwrap()
.get_bytes_per_second(),
1024
);

// Change io limit
gc_worker.change_io_limit(2048).unwrap();
assert_eq!(
gc_worker
.limiter
.lock()
.unwrap()
.as_ref()
.unwrap()
.get_bytes_per_second(),
2048,
);

// Disable io limit
gc_worker.change_io_limit(0).unwrap();
assert!(gc_worker.limiter.lock().unwrap().is_none());
}
}
2 changes: 1 addition & 1 deletion src/storage/mod.rs
Expand Up @@ -630,7 +630,7 @@ pub struct Storage<E: Engine, L: LockMgr> {
read_pool: ReadPool,

/// Used to handle requests related to GC.
gc_worker: GCWorker<E>,
pub gc_worker: GCWorker<E>,

/// How many strong references. Thread pool and workers will be stopped
/// once there are no more references.
Expand Down