Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
raftstore: Use multi-batch-write for apply (tikv#7111) (tikv#7718)
Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
  • Loading branch information
Little-Wallace committed May 14, 2020
1 parent 2e06572 commit 981ca2c
Show file tree
Hide file tree
Showing 9 changed files with 458 additions and 146 deletions.
18 changes: 17 additions & 1 deletion components/engine_panic/src/write_batch.rs
Expand Up @@ -5,16 +5,29 @@ use engine_traits::{Mutable, Result, WriteBatch, WriteBatchExt, WriteOptions};

impl WriteBatchExt for PanicEngine {
type WriteBatch = PanicWriteBatch;
type WriteBatchVec = PanicWriteBatch;

fn write_opt(&self, wb: &Self::WriteBatch, opts: &WriteOptions) -> Result<()> {
panic!()
}

fn support_write_batch_vec(&self) -> bool {
panic!()
}

fn write_vec_opt(&self, wb: &Self::WriteBatchVec, opts: &WriteOptions) -> Result<()> {
panic!()
}

fn write_batch(&self) -> Self::WriteBatch {
panic!()
}
fn write_batch_with_cap(&self, cap: usize) -> Self::WriteBatch {
panic!()
}
fn write_batch_vec(&self, vec_size: usize, cap: usize) -> Self::WriteBatchVec {
panic!()
}
}

pub struct PanicWriteBatch;
Expand All @@ -29,10 +42,13 @@ impl WriteBatch for PanicWriteBatch {
fn is_empty(&self) -> bool {
panic!()
}
fn clear(&mut self) {
fn should_write_to_engine(&self) -> bool {
panic!()
}

fn clear(&mut self) {
panic!()
}
fn set_save_point(&mut self) {
panic!()
}
Expand Down
241 changes: 237 additions & 4 deletions components/engine_rocks/src/write_batch.rs
Expand Up @@ -4,13 +4,16 @@ use std::sync::Arc;

use crate::engine::RocksEngine;
use crate::options::RocksWriteOptions;
use engine_traits::{self, Error, Mutable, Result, WriteBatchExt, WriteOptions};
use crate::util::get_cf_handle;
use engine_traits::{self, Error, Mutable, Result, WriteBatchExt, WriteBatchVecExt, WriteOptions};
use rocksdb::{Writable, WriteBatch as RawWriteBatch, DB};

use crate::util::get_cf_handle;
pub const WRITE_BATCH_MAX_KEYS: usize = 256;
pub const WRITE_BATCH_MAX_BATCH: usize = 16;

impl WriteBatchExt for RocksEngine {
type WriteBatch = RocksWriteBatch;
type WriteBatchVec = RocksWriteBatchVec;

fn write_opt(&self, wb: &Self::WriteBatch, opts: &WriteOptions) -> Result<()> {
debug_assert_eq!(
Expand All @@ -24,13 +27,35 @@ impl WriteBatchExt for RocksEngine {
.map_err(Error::Engine)
}

fn write_batch_with_cap(&self, cap: usize) -> Self::WriteBatch {
Self::WriteBatch::with_capacity(Arc::clone(&self.as_inner()), cap)
fn write_vec_opt(&self, wb: &RocksWriteBatchVec, opts: &WriteOptions) -> Result<()> {
let opt: RocksWriteOptions = opts.into();
if wb.index > 0 {
self.as_inner()
.multi_batch_write(wb.as_inner(), &opt.into_raw())
.map_err(Error::Engine)
} else {
self.as_inner()
.write_opt(&wb.wbs[0], &opt.into_raw())
.map_err(Error::Engine)
}
}

fn support_write_batch_vec(&self) -> bool {
let options = self.as_inner().get_db_options();
options.is_enable_multi_batch_write()
}

fn write_batch(&self) -> Self::WriteBatch {
Self::WriteBatch::new(Arc::clone(&self.as_inner()))
}

fn write_batch_with_cap(&self, cap: usize) -> Self::WriteBatch {
Self::WriteBatch::with_capacity(Arc::clone(&self.as_inner()), cap)
}

fn write_batch_vec(&self, vec_size: usize, cap: usize) -> Self::WriteBatchVec {
RocksWriteBatchVec::new(Arc::clone(&self.as_inner()), vec_size, cap)
}
}

pub struct RocksWriteBatch {
Expand Down Expand Up @@ -81,6 +106,10 @@ impl engine_traits::WriteBatch for RocksWriteBatch {
self.wb.is_empty()
}

fn should_write_to_engine(&self) -> bool {
self.wb.count() > WRITE_BATCH_MAX_KEYS
}

fn clear(&mut self) {
self.wb.clear();
}
Expand Down Expand Up @@ -124,3 +153,207 @@ impl Mutable for RocksWriteBatch {
.map_err(Error::Engine)
}
}

/// `RocksWriteBatchVec` is for method `multi_batch_write` of RocksDB, which splits a large WriteBatch
/// into many smaller ones and then any thread could help to deal with these small WriteBatch when it
/// is calling `AwaitState` and wait to become leader of WriteGroup. `multi_batch_write` will perform
/// much better than traditional `pipelined_write` when TiKV writes very large data into RocksDB. We
/// will remove this feature when `unordered_write` of RocksDB becomes more stable and becomes compatible
/// with Titan.
pub struct RocksWriteBatchVec {
db: Arc<DB>,
wbs: Vec<RawWriteBatch>,
save_points: Vec<usize>,
index: usize,
cur_batch_size: usize,
batch_size_limit: usize,
}

impl RocksWriteBatchVec {
pub fn new(db: Arc<DB>, batch_size_limit: usize, cap: usize) -> RocksWriteBatchVec {
let wb = RawWriteBatch::with_capacity(cap);
RocksWriteBatchVec {
db,
wbs: vec![wb],
save_points: vec![],
index: 0,
cur_batch_size: 0,
batch_size_limit,
}
}

pub fn as_inner(&self) -> &[RawWriteBatch] {
&self.wbs[0..=self.index]
}

pub fn as_raw(&self) -> &RawWriteBatch {
&self.wbs[0]
}

pub fn get_db(&self) -> &DB {
self.db.as_ref()
}

/// `check_switch_batch` will split a large WriteBatch into many smaller ones. This is to avoid
/// a large WriteBatch blocking write_thread too long.
fn check_switch_batch(&mut self) {
if self.batch_size_limit > 0 && self.cur_batch_size >= self.batch_size_limit {
self.index += 1;
self.cur_batch_size = 0;
if self.index >= self.wbs.len() {
self.wbs.push(RawWriteBatch::default());
}
}
self.cur_batch_size += 1;
}
}

impl engine_traits::WriteBatch for RocksWriteBatchVec {
fn data_size(&self) -> usize {
self.wbs.iter().fold(0, |a, b| a + b.data_size())
}

fn count(&self) -> usize {
self.cur_batch_size + self.index * self.batch_size_limit
}

fn is_empty(&self) -> bool {
self.wbs[0].is_empty()
}

fn should_write_to_engine(&self) -> bool {
self.index >= WRITE_BATCH_MAX_BATCH
}

fn clear(&mut self) {
for i in 0..=self.index {
self.wbs[i].clear();
}
self.save_points.clear();
self.index = 0;
self.cur_batch_size = 0;
}

fn set_save_point(&mut self) {
self.wbs[self.index].set_save_point();
self.save_points.push(self.index);
}

fn pop_save_point(&mut self) -> Result<()> {
if let Some(x) = self.save_points.pop() {
return self.wbs[x].pop_save_point().map_err(Error::Engine);
}
Err(Error::Engine("no save point".into()))
}

fn rollback_to_save_point(&mut self) -> Result<()> {
if let Some(x) = self.save_points.pop() {
for i in x + 1..=self.index {
self.wbs[i].clear();
}
self.index = x;
return self.wbs[x].rollback_to_save_point().map_err(Error::Engine);
}
Err(Error::Engine("no save point".into()))
}
}

impl Mutable for RocksWriteBatchVec {
fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
self.check_switch_batch();
self.wbs[self.index].put(key, value).map_err(Error::Engine)
}

fn put_cf(&mut self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> {
self.check_switch_batch();
let handle = get_cf_handle(self.db.as_ref(), cf)?;
self.wbs[self.index]
.put_cf(handle, key, value)
.map_err(Error::Engine)
}

fn delete(&mut self, key: &[u8]) -> Result<()> {
self.check_switch_batch();
self.wbs[self.index].delete(key).map_err(Error::Engine)
}

fn delete_cf(&mut self, cf: &str, key: &[u8]) -> Result<()> {
self.check_switch_batch();
let handle = get_cf_handle(self.db.as_ref(), cf)?;
self.wbs[self.index]
.delete_cf(handle, key)
.map_err(Error::Engine)
}

fn delete_range_cf(&mut self, cf: &str, begin_key: &[u8], end_key: &[u8]) -> Result<()> {
self.check_switch_batch();
let handle = get_cf_handle(self.db.as_ref(), cf)?;
self.wbs[self.index]
.delete_range_cf(handle, begin_key, end_key)
.map_err(Error::Engine)
}
}
impl WriteBatchVecExt<RocksEngine> for RocksWriteBatch {
fn write_batch_vec(e: &RocksEngine, _vec_size: usize, cap: usize) -> RocksWriteBatch {
e.write_batch_with_cap(cap)
}

fn write_to_engine(&self, e: &RocksEngine, opts: &WriteOptions) -> Result<()> {
e.write_opt(self, opts)
}
}

impl WriteBatchVecExt<RocksEngine> for RocksWriteBatchVec {
fn write_batch_vec(e: &RocksEngine, vec_size: usize, cap: usize) -> RocksWriteBatchVec {
e.write_batch_vec(vec_size, cap)
}

fn write_to_engine(&self, e: &RocksEngine, opts: &WriteOptions) -> Result<()> {
e.write_vec_opt(self, opts)
}
}

#[cfg(test)]
mod tests {
use super::super::util::new_engine_opt;
use super::super::RocksDBOptions;
use super::*;
use engine_traits::WriteBatch;
use rocksdb::DBOptions as RawDBOptions;
use tempfile::Builder;

#[test]
fn test_should_write_to_engine() {
let path = Builder::new()
.prefix("test-should-write-to-engine")
.tempdir()
.unwrap();
let opt = RawDBOptions::default();
opt.enable_multi_batch_write(true);
opt.enable_unordered_write(false);
opt.enable_pipelined_write(true);
let engine = new_engine_opt(
path.path().join("db").to_str().unwrap(),
RocksDBOptions::from_raw(opt),
vec![],
)
.unwrap();
assert!(engine.support_write_batch_vec());
let mut wb = engine.write_batch();
for _i in 0..WRITE_BATCH_MAX_KEYS {
wb.put(b"aaa", b"bbb").unwrap();
}
assert!(!wb.should_write_to_engine());
wb.put(b"aaa", b"bbb").unwrap();
assert!(wb.should_write_to_engine());
let mut wb = engine.write_batch_vec(4, 1024);
for _i in 0..WRITE_BATCH_MAX_BATCH * 4 {
wb.put(b"aaa", b"bbb").unwrap();
}
assert!(!wb.should_write_to_engine());
wb.put(b"aaa", b"bbb").unwrap();
assert!(wb.should_write_to_engine());
wb.clear();
assert!(!wb.should_write_to_engine());
}
}
5 changes: 5 additions & 0 deletions components/engine_traits/src/engine.rs
Expand Up @@ -36,3 +36,8 @@ pub trait KvEngine:
/// It cannot be used forever.
fn bad_downcast<T: 'static>(&self) -> &T;
}

pub trait WriteBatchVecExt<E: KvEngine> {
fn write_batch_vec(e: &E, vec_size: usize, cap: usize) -> Self;
fn write_to_engine(&self, e: &E, opts: &WriteOptions) -> Result<()>;
}
9 changes: 8 additions & 1 deletion components/engine_traits/src/write_batch.rs
Expand Up @@ -6,21 +6,28 @@ use crate::options::WriteOptions;

pub trait WriteBatchExt {
type WriteBatch: WriteBatch;
/// `WriteBatchVec` is used for `multi_batch_write` of RocksEngine and other Engine could also
/// implement another kind of WriteBatch according to their needs.
type WriteBatchVec: WriteBatch;

fn write_opt(&self, wb: &Self::WriteBatch, opts: &WriteOptions) -> Result<()>;
fn write_vec_opt(&self, wb: &Self::WriteBatchVec, opts: &WriteOptions) -> Result<()>;
fn support_write_batch_vec(&self) -> bool;
fn write(&self, wb: &Self::WriteBatch) -> Result<()> {
self.write_opt(wb, &WriteOptions::default())
}
fn write_batch(&self) -> Self::WriteBatch;
fn write_batch_with_cap(&self, cap: usize) -> Self::WriteBatch;
fn write_batch_vec(&self, vec_size: usize, cap: usize) -> Self::WriteBatchVec;
}

pub trait WriteBatch: Mutable + Send {
fn data_size(&self) -> usize;
fn count(&self) -> usize;
fn is_empty(&self) -> bool;
fn clear(&mut self);
fn should_write_to_engine(&self) -> bool;

fn clear(&mut self);
fn set_save_point(&mut self);
fn pop_save_point(&mut self) -> Result<()>;
fn rollback_to_save_point(&mut self) -> Result<()>;
Expand Down

0 comments on commit 981ca2c

Please sign in to comment.