Skip to content

Commit

Permalink
Merge pull request tikv#11 from gengliqi/raftstore-async-io-dev
Browse files Browse the repository at this point in the history
reduce apply io lock time
  • Loading branch information
innerr committed Feb 14, 2021
2 parents c851e9f + a49b6b8 commit f0f2377
Show file tree
Hide file tree
Showing 13 changed files with 308 additions and 235 deletions.
4 changes: 4 additions & 0 deletions components/engine_panic/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ impl WriteBatch<PanicEngine> for PanicWriteBatch {
fn write_to_engine(&self, _: &PanicEngine, _: &WriteOptions) -> Result<()> {
panic!()
}

fn append(&mut self, _: &mut Self) {
panic!()
}
}

impl Mutable for PanicWriteBatch {
Expand Down
40 changes: 40 additions & 0 deletions components/engine_rocks/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ impl engine_traits::WriteBatch<RocksEngine> for RocksWriteBatch {
fn write_to_engine(&self, e: &RocksEngine, opts: &WriteOptions) -> Result<()> {
e.write_opt(self, opts)
}

fn append(&mut self, _: &mut Self) {
// not implemented yet
panic!("append is not implemented for write batch");
}
}

impl Mutable for RocksWriteBatch {
Expand Down Expand Up @@ -200,6 +205,37 @@ impl RocksWriteBatchVec {
self.db.as_ref()
}

pub fn append(&mut self, src: &mut RocksWriteBatchVec) {
if src.is_empty() {
return;
}
if self.is_empty() {
self.wbs = std::mem::take(&mut src.wbs);
self.save_points = std::mem::take(&mut src.save_points);
self.index = src.index;
self.cur_batch_size = src.cur_batch_size;
} else {
let len = self.index + 1;
let mut src_wbs = std::mem::take(&mut src.wbs);
for (i, wb) in src_wbs.drain(..).enumerate() {
if i > src.index {
break;
}
self.wbs.push(wb);
self.index += 1;
}
for p in &src.save_points {
self.save_points.push(*p + len);
}
self.cur_batch_size = src.cur_batch_size;
}
// Clear src write batch
src.wbs.push(RawWriteBatch::default());
src.save_points.clear();
src.index = 0;
src.cur_batch_size = 0;
}

/// `check_switch_batch` will split a large WriteBatch into many smaller ones. This is to avoid
/// a large WriteBatch blocking write_thread too long.
fn check_switch_batch(&mut self) {
Expand All @@ -222,6 +258,10 @@ impl engine_traits::WriteBatch<RocksEngine> for RocksWriteBatchVec {
fn write_to_engine(&self, e: &RocksEngine, opts: &WriteOptions) -> Result<()> {
e.write_vec_opt(self, opts)
}

fn append(&mut self, src: &mut RocksWriteBatchVec) {
self.append(src);
}
}

impl Mutable for RocksWriteBatchVec {
Expand Down
1 change: 1 addition & 0 deletions components/engine_traits/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ pub trait Mutable: Send {
pub trait WriteBatch<E: WriteBatchExt + Sized>: Mutable {
fn with_capacity(e: &E, cap: usize) -> Self;
fn write_to_engine(&self, e: &E, opts: &WriteOptions) -> Result<()>;
fn append(&mut self, _: &mut Self);
}
6 changes: 6 additions & 0 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ with_prefix!(prefix_store "store-");
#[serde(default)]
#[serde(rename_all = "kebab-case")]
pub struct Config {
#[config(skip)]
pub trigger_apply_io_bytes: u64,
#[config(skip)]
pub trigger_apply_io_keys: u64,
#[config(skip)]
pub cmd_batch: bool,
// minimizes disruption when a partitioned node rejoins the cluster by using a two phase election.
Expand Down Expand Up @@ -194,6 +198,8 @@ impl Default for Config {
fn default() -> Config {
let split_size = ReadableSize::mb(coprocessor::config::SPLIT_SIZE_MB);
Config {
trigger_apply_io_bytes: 1024,
trigger_apply_io_keys: 32,
cmd_batch: true,
prevote: true,
raftdb_path: String::new(),
Expand Down
Loading

0 comments on commit f0f2377

Please sign in to comment.