Skip to content

Commit

Permalink
gc: config gc io limit dynamically (#5769)
Browse files Browse the repository at this point in the history
Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
  • Loading branch information
youjiali1995 committed Nov 19, 2019
1 parent 8a64c6b commit 5b6b36d
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 34 deletions.
13 changes: 7 additions & 6 deletions src/bin/tikv-ctl.rs
Expand Up @@ -40,6 +40,8 @@ use tikv::config::TiKvConfig;
use tikv::pd::{Config as PdConfig, PdClient, RpcClient};
use tikv::raftstore::store::{keys, INIT_EPOCH_CONF_VER};
use tikv::server::debug::{BottommostLevelCompaction, Debugger, RegionInfo};
use tikv::server::ServerRaftStoreRouter;
use tikv::storage::kv::{raftkv::RaftKv, Engine};
use tikv::storage::Key;
use tikv_util::security::{SecurityConfig, SecurityManager};
use tikv_util::{escape, unescape};
Expand Down Expand Up @@ -88,11 +90,10 @@ fn new_debug_executor(
let raft_db =
rocks::util::new_engine_opt(&raft_path, raft_db_opts, raft_db_cf_opts).unwrap();

Box::new(Debugger::new(Engines::new(
Arc::new(kv_db),
Arc::new(raft_db),
cache.is_some(),
))) as Box<dyn DebugExecutor>
Box::new(Debugger::<RaftKv<ServerRaftStoreRouter>>::new(
Engines::new(Arc::new(kv_db), Arc::new(raft_db), cache.is_some()),
None,
)) as Box<dyn DebugExecutor>
}
(Some(remote), None) => Box::new(new_debug_client(remote, mgr)) as Box<dyn DebugExecutor>,
_ => unreachable!(),
Expand Down Expand Up @@ -746,7 +747,7 @@ impl DebugExecutor for DebugClient {
}
}

impl DebugExecutor for Debugger {
impl<E: Engine> DebugExecutor for Debugger<E> {
fn check_local_mode(&self) {}

fn get_all_meta_regions(&self) -> Vec<u64> {
Expand Down
54 changes: 47 additions & 7 deletions src/server/debug.rs
Expand Up @@ -32,8 +32,10 @@ use crate::raftstore::store::{
write_peer_state,
};
use crate::raftstore::store::{keys, PeerStorage};
use crate::storage::gc_worker::GCWorker;
use crate::storage::mvcc::{Lock, LockType, Write, WriteType};
use crate::storage::types::Key;
use crate::storage::Engine;
use crate::storage::Iterator as EngineIterator;
use tikv_util::codec::bytes;
use tikv_util::collections::HashSet;
Expand All @@ -42,6 +44,8 @@ use tikv_util::escape;
use tikv_util::keybuilder::KeyBuilder;
use tikv_util::worker::Worker;

const GC_IO_LIMITER_CONFIG_NAME: &str = "gc.max_write_bytes_per_sec";

pub type Result<T> = result::Result<T, Error>;
type DBIterator = RocksIterator<Arc<DB>>;

Expand Down Expand Up @@ -128,13 +132,14 @@ impl From<BottommostLevelCompaction> for debugpb::BottommostLevelCompaction {
}

#[derive(Clone)]
pub struct Debugger {
pub struct Debugger<E: Engine> {
engines: Engines,
gc_worker: Option<GCWorker<E>>,
}

impl Debugger {
pub fn new(engines: Engines) -> Debugger {
Debugger { engines }
impl<E: Engine> Debugger<E> {
pub fn new(engines: Engines, gc_worker: Option<GCWorker<E>>) -> Debugger<E> {
Debugger { engines, gc_worker }
}

pub fn get_engine(&self) -> &Engines {
Expand Down Expand Up @@ -793,6 +798,22 @@ impl Debugger {
}
Ok(())
}
MODULE::SERVER => {
if config_name == GC_IO_LIMITER_CONFIG_NAME {
if let Ok(bytes_per_sec) = ReadableSize::from_str(config_value) {
return self
.gc_worker
.as_ref()
.expect("must be some")
.change_io_limit(bytes_per_sec.0)
.map_err(|e| Error::Other(e.into()));
}
}
Err(Error::InvalidArgument(format!(
"bad argument: {} {}",
config_name, config_value
)))
}
_ => Err(Error::NotFound(format!("unsupported module: {:?}", module))),
}
}
Expand Down Expand Up @@ -1483,7 +1504,9 @@ mod tests {
use tempdir::TempDir;

use super::*;
use crate::storage::gc_worker::GCConfig;
use crate::storage::mvcc::{Lock, LockType};
use crate::storage::{RocksEngine as TestEngine, TestEngineBuilder};
use engine::rocks;
use engine::rocks::util::{new_engine_opt, CFOptions};
use engine::Mutable;
Expand Down Expand Up @@ -1582,7 +1605,7 @@ mod tests {
}
}

fn new_debugger() -> Debugger {
fn new_debugger() -> Debugger<TestEngine> {
let tmp = TempDir::new("test_debug").unwrap();
let path = tmp.path().to_str().unwrap();
let engine = Arc::new(
Expand All @@ -1601,10 +1624,13 @@ mod tests {

let shared_block_cache = false;
let engines = Engines::new(Arc::clone(&engine), engine, shared_block_cache);
Debugger::new(engines)
let test_engine = TestEngineBuilder::new().build().unwrap();
let mut gc_worker = GCWorker::new(test_engine, None, None, GCConfig::default());
gc_worker.start().unwrap();
Debugger::new(engines, Some(gc_worker))
}

impl Debugger {
impl Debugger<TestEngine> {
fn set_store_id(&self, store_id: u64) {
let mut ident = StoreIdent::new();
ident.set_store_id(store_id);
Expand Down Expand Up @@ -2265,4 +2291,18 @@ mod tests {
&keys[1..9],
);
}

#[test]
fn test_modify_gc_io_limit() {
let debugger = new_debugger();
debugger
.modify_tikv_config(MODULE::SERVER, "gc", "10MB")
.unwrap_err();
debugger
.modify_tikv_config(MODULE::STORAGE, GC_IO_LIMITER_CONFIG_NAME, "10MB")
.unwrap_err();
debugger
.modify_tikv_config(MODULE::SERVER, GC_IO_LIMITER_CONFIG_NAME, "10MB")
.unwrap();
}
}
3 changes: 2 additions & 1 deletion src/server/server.rs
Expand Up @@ -97,6 +97,7 @@ impl<T: RaftStoreRouter, S: StoreAddrResolver + 'static> Server<T, S> {
);
let snap_worker = Worker::new("snap-handler");

let gc_worker = storage.gc_worker.clone();
let kv_service = KvService::new(
storage,
cop,
Expand All @@ -123,7 +124,7 @@ impl<T: RaftStoreRouter, S: StoreAddrResolver + 'static> Server<T, S> {
.register_service(create_tikv(kv_service));
sb = security_mgr.bind(sb, &ip, addr.port());
if let Some(engines) = debug_engines {
let debug_service = DebugService::new(engines, raft_router.clone());
let debug_service = DebugService::new(engines, raft_router.clone(), gc_worker);
sb = sb.register_service(create_debug(debug_service));
}
if let Some(service) = import_service {
Expand Down
16 changes: 9 additions & 7 deletions src/server/service/debug.rs
Expand Up @@ -19,6 +19,8 @@ use protobuf::text_format::print_to_string;
use crate::raftstore::store::msg::Callback;
use crate::server::debug::{Debugger, Error};
use crate::server::transport::RaftStoreRouter;
use crate::storage::gc_worker::GCWorker;
use crate::storage::kv::Engine;
use tikv_util::metrics;

use tikv_alloc;
Expand All @@ -45,20 +47,20 @@ fn error_to_grpc_error(tag: &'static str, e: Error) -> GrpcError {

/// Service handles the RPC messages for the `Debug` service.
#[derive(Clone)]
pub struct Service<T: RaftStoreRouter> {
pub struct Service<T: RaftStoreRouter, E: Engine> {
pool: CpuPool,
debugger: Debugger,
debugger: Debugger<E>,
raft_router: T,
}

impl<T: RaftStoreRouter> Service<T> {
/// Constructs a new `Service` with `Engines` and a `RaftStoreRouter`.
pub fn new(engines: Engines, raft_router: T) -> Service<T> {
impl<T: RaftStoreRouter, E: Engine> Service<T, E> {
/// Constructs a new `Service` with `Engines`, a `RaftStoreRouter` and a `GCWorker`.
pub fn new(engines: Engines, raft_router: T, gc_worker: GCWorker<E>) -> Service<T, E> {
let pool = Builder::new()
.name_prefix(thd_name!("debugger"))
.pool_size(1)
.create();
let debugger = Debugger::new(engines);
let debugger = Debugger::new(engines, Some(gc_worker));
Service {
pool,
debugger,
Expand All @@ -84,7 +86,7 @@ impl<T: RaftStoreRouter> Service<T> {
}
}

impl<T: RaftStoreRouter + 'static> debugpb_grpc::Debug for Service<T> {
impl<T: RaftStoreRouter + 'static, E: Engine + 'static> debugpb_grpc::Debug for Service<T, E> {
fn get(&mut self, ctx: RpcContext<'_>, mut req: GetRequest, sink: UnarySink<GetResponse>) {
const TAG: &str = "debug_get";

Expand Down
79 changes: 67 additions & 12 deletions src/storage/gc_worker.rs
Expand Up @@ -166,7 +166,7 @@ struct GCRunner<E: Engine> {
raft_store_router: Option<ServerRaftStoreRouter>,

/// Used to limit the write flow of GC.
limiter: Option<IOLimiter>,
limiter: Arc<Mutex<Option<IOLimiter>>>,

cfg: GCConfig,

Expand All @@ -178,13 +178,9 @@ impl<E: Engine> GCRunner<E> {
engine: E,
local_storage: Option<Arc<DB>>,
raft_store_router: Option<ServerRaftStoreRouter>,
limiter: Arc<Mutex<Option<IOLimiter>>>,
cfg: GCConfig,
) -> Self {
let limiter = if cfg.max_write_bytes_per_sec.0 > 0 {
Some(IOLimiter::new(cfg.max_write_bytes_per_sec.0))
} else {
None
};
Self {
engine,
local_storage,
Expand Down Expand Up @@ -296,7 +292,7 @@ impl<E: Engine> GCRunner<E> {
let write_size = txn.write_size();
let modifies = txn.into_modifies();
if !modifies.is_empty() {
if let Some(limiter) = &self.limiter {
if let Some(limiter) = &*self.limiter.lock().unwrap() {
limiter.request(write_size as i64);
}
self.engine.write(ctx, modifies)?;
Expand Down Expand Up @@ -1105,6 +1101,7 @@ pub struct GCWorker<E: Engine> {
raft_store_router: Option<ServerRaftStoreRouter>,

cfg: Option<GCConfig>,
limiter: Arc<Mutex<Option<IOLimiter>>>,

worker: Arc<Mutex<Worker<GCTask>>>,
worker_scheduler: worker::Scheduler<GCTask>,
Expand All @@ -1125,11 +1122,17 @@ impl<E: Engine> GCWorker<E> {
.create(),
));
let worker_scheduler = worker.lock().unwrap().scheduler();
let limiter = if cfg.max_write_bytes_per_sec.0 > 0 {
Some(IOLimiter::new(cfg.max_write_bytes_per_sec.0))
} else {
None
};
GCWorker {
engine,
local_storage,
raft_store_router,
cfg: Some(cfg),
limiter: Arc::new(Mutex::new(limiter)),
worker,
worker_scheduler,
gc_manager_handle: Arc::new(Mutex::new(None)),
Expand All @@ -1152,6 +1155,7 @@ impl<E: Engine> GCWorker<E> {
self.engine.clone(),
self.local_storage.take(),
self.raft_store_router.take(),
self.limiter.clone(),
self.cfg.take().unwrap(),
);
self.worker
Expand All @@ -1167,12 +1171,12 @@ impl<E: Engine> GCWorker<E> {
h.stop()?;
}
// Stop self.
let h = self.worker.lock().unwrap().stop().unwrap();
if let Err(e) = h.join() {
Err(box_err!("failed to join gc_worker handle, err: {:?}", e))
} else {
Ok(())
if let Some(h) = self.worker.lock().unwrap().stop() {
if let Err(e) = h.join() {
return Err(box_err!("failed to join gc_worker handle, err: {:?}", e));
}
}
Ok(())
}

pub fn async_gc(&self, ctx: Context, safe_point: u64, callback: Callback<()>) -> Result<()> {
Expand Down Expand Up @@ -1206,6 +1210,19 @@ impl<E: Engine> GCWorker<E> {
})
.or_else(handle_gc_task_schedule_error)
}

pub fn change_io_limit(&self, limit: u64) -> Result<()> {
let mut limiter = self.limiter.lock().unwrap();
if limit == 0 {
limiter.take();
} else {
limiter
.get_or_insert_with(|| IOLimiter::new(limit))
.set_bytes_per_second(limit as i64);
}
info!("GC io limit changed"; "max_write_bytes_per_sec" => limit);
Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1734,4 +1751,42 @@ mod tests {
invalid_cfg.batch_keys = 0;
assert!(invalid_cfg.validate().is_err());
}

#[test]
fn test_change_io_limit() {
let engine = TestEngineBuilder::new().build().unwrap();
let mut gc_worker = GCWorker::new(engine, None, None, GCConfig::default());
gc_worker.start().unwrap();
assert!(gc_worker.limiter.lock().unwrap().is_none());

// Enable io iolimit
gc_worker.change_io_limit(1024).unwrap();
assert_eq!(
gc_worker
.limiter
.lock()
.unwrap()
.as_ref()
.unwrap()
.get_bytes_per_second(),
1024
);

// Change io limit
gc_worker.change_io_limit(2048).unwrap();
assert_eq!(
gc_worker
.limiter
.lock()
.unwrap()
.as_ref()
.unwrap()
.get_bytes_per_second(),
2048,
);

// Disable io limit
gc_worker.change_io_limit(0).unwrap();
assert!(gc_worker.limiter.lock().unwrap().is_none());
}
}
2 changes: 1 addition & 1 deletion src/storage/mod.rs
Expand Up @@ -630,7 +630,7 @@ pub struct Storage<E: Engine, L: LockMgr> {
read_pool: ReadPool,

/// Used to handle requests related to GC.
gc_worker: GCWorker<E>,
pub gc_worker: GCWorker<E>,

/// How many strong references. Thread pool and workers will be stopped
/// once there are no more references.
Expand Down

0 comments on commit 5b6b36d

Please sign in to comment.