Skip to content

Commit

Permalink
Optimize seek_write in prewrite (tikv#5846)
Browse files Browse the repository at this point in the history
  • Loading branch information
Little-Wallace authored and wangweizhen committed Dec 1, 2019
1 parent b632011 commit 5015ede
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 20 deletions.
8 changes: 7 additions & 1 deletion src/server/gc_worker.rs
Expand Up @@ -263,7 +263,13 @@ impl<E: Engine> GCRunner<E> {
mut next_scan_key: Option<Key>,
) -> Result<Option<Key>> {
let snapshot = self.get_snapshot(ctx)?;
let mut txn = MvccTxn::new(snapshot, TimeStamp::zero(), !ctx.get_not_fill_cache()).unwrap();
let mut txn = MvccTxn::for_scan(
snapshot,
Some(ScanMode::Forward),
TimeStamp::zero(),
!ctx.get_not_fill_cache(),
)
.unwrap();
for k in keys {
let gc_info = txn.gc(k.clone(), safe_point)?;

Expand Down
3 changes: 2 additions & 1 deletion src/storage/mvcc/mod.rs
Expand Up @@ -896,7 +896,8 @@ pub mod tests {
pub fn must_gc<E: Engine>(engine: &E, key: &[u8], safe_point: impl Into<TimeStamp>) {
let ctx = Context::default();
let snapshot = engine.snapshot(&ctx).unwrap();
let mut txn = MvccTxn::new(snapshot, TimeStamp::zero(), true).unwrap();
let mut txn =
MvccTxn::for_scan(snapshot, Some(ScanMode::Forward), TimeStamp::zero(), true).unwrap();
txn.gc(Key::from_raw(key), safe_point.into()).unwrap();
write(engine, &ctx, txn.into_modifies());
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mvcc/reader/mod.rs
Expand Up @@ -7,4 +7,4 @@ mod scanner;
pub use self::point_getter::{PointGetter, PointGetterBuilder};
pub use self::reader::MvccReader;
pub use self::scanner::EntryScanner;
pub use self::scanner::{Scanner, ScannerBuilder};
pub use self::scanner::{has_data_in_range, Scanner, ScannerBuilder};
23 changes: 20 additions & 3 deletions src/storage/mvcc/reader/scanner/mod.rs
Expand Up @@ -4,16 +4,16 @@ mod backward;
mod forward;
mod txn_entry;

use engine::{CfName, CF_DEFAULT, CF_LOCK, CF_WRITE};
use engine::{CfName, IterOption, CF_DEFAULT, CF_LOCK, CF_WRITE};
use kvproto::kvrpcpb::IsolationLevel;

use self::backward::BackwardScanner;
use self::forward::ForwardScanner;
use crate::storage::mvcc::{default_not_found_error, Result, TimeStamp, TsSet};
use crate::storage::txn::Result as TxnResult;
use crate::storage::{
Cursor, CursorBuilder, Iterator, Key, ScanMode, Scanner as StoreScanner, Snapshot, Statistics,
Value,
CFStatistics, Cursor, CursorBuilder, Iterator, Key, ScanMode, Scanner as StoreScanner,
Snapshot, Statistics, Value,
};

pub use self::txn_entry::Scanner as EntryScanner;
Expand Down Expand Up @@ -254,6 +254,23 @@ where
Ok(default_cursor.value(&mut statistics.data).to_vec())
}

pub fn has_data_in_range<S: Snapshot>(
snapshot: S,
cf: CfName,
left: &Key,
right: &Key,
statistic: &mut CFStatistics,
) -> Result<bool> {
let iter_opt = IterOption::new(None, None, true);
let mut iter = snapshot.iter_cf(cf, iter_opt, ScanMode::Forward)?;
if iter.seek(left, statistic)? {
if iter.key(statistic) < right.as_encoded().as_slice() {
return Ok(true);
}
}
Ok(false)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
27 changes: 19 additions & 8 deletions src/storage/mvcc/txn.rs
Expand Up @@ -23,7 +23,6 @@ pub struct GcInfo {

pub struct MvccTxn<S: Snapshot> {
reader: MvccReader<S>,
gc_reader: MvccReader<S>,
start_ts: TimeStamp,
writes: Vec<Modify>,
write_size: usize,
Expand All @@ -48,12 +47,24 @@ impl<S: Snapshot> MvccTxn<S> {
// IsolationLevel is `Si`, actually the method we use in MvccTxn does not rely on
// isolation level, so it can be any value.
reader: MvccReader::new(snapshot.clone(), None, fill_cache, IsolationLevel::Si),
gc_reader: MvccReader::new(
snapshot,
Some(ScanMode::Forward),
fill_cache,
IsolationLevel::Si,
),
start_ts,
writes: vec![],
write_size: 0,
collapse_rollback: true,
})
}
pub fn for_scan(
snapshot: S,
scan_mode: Option<ScanMode>,
start_ts: TimeStamp,
fill_cache: bool,
) -> Result<Self> {
Ok(Self {
// Use `ScanMode::Forward` when gc or prewrite with multiple `Mutation::Insert`,
// which would seek less times.
// When `scan_mode` is `Some(ScanMode::Forward)`, all keys must be writte by
// in ascending order.
reader: MvccReader::new(snapshot.clone(), scan_mode, fill_cache, IsolationLevel::Si),
start_ts,
writes: vec![],
write_size: 0,
Expand Down Expand Up @@ -751,7 +762,7 @@ impl<S: Snapshot> MvccTxn<S> {
let mut deleted_versions = 0;
let mut latest_delete = None;
let mut is_completed = true;
while let Some((commit, write)) = self.gc_reader.seek_write(&key, ts)? {
while let Some((commit, write)) = self.reader.seek_write(&key, ts)? {
ts = commit.prev();
found_versions += 1;

Expand Down
215 changes: 209 additions & 6 deletions src/storage/txn/process.rs
Expand Up @@ -12,8 +12,8 @@ use crate::storage::kv::with_tls_engine;
use crate::storage::kv::{CbContext, Modify, Result as EngineResult};
use crate::storage::lock_manager::{self, Lock, LockManager};
use crate::storage::mvcc::{
Error as MvccError, ErrorInner as MvccErrorInner, Lock as MvccLock, MvccReader, MvccTxn,
TimeStamp, Write, MAX_TXN_WRITE_SIZE,
has_data_in_range, Error as MvccError, ErrorInner as MvccErrorInner, Lock as MvccLock,
MvccReader, MvccTxn, TimeStamp, Write, MAX_TXN_WRITE_SIZE,
};
use crate::storage::txn::{sched_pool::*, scheduler::Msg, Error, ErrorInner, Result};
use crate::storage::types::ProcessResult;
Expand All @@ -22,9 +22,12 @@ use crate::storage::{
Command, CommandKind, Engine, Error as StorageError, ErrorInner as StorageErrorInner, Key,
MvccInfo, Result as StorageResult, ScanMode, Snapshot, Statistics, TxnStatus, Value,
};
use engine::CF_WRITE;
use tikv_util::collections::HashMap;
use tikv_util::time::{Instant, SlowTimer};

pub const FORWARD_MIN_MUTATIONS_NUM: usize = 12;

// To resolve a key, the write size is about 100~150 bytes, depending on key and value length.
// The write batch will be around 32KB if we scan 256 keys each time.
pub const RESOLVE_LOCK_BATCH_SIZE: usize = 256;
Expand Down Expand Up @@ -475,15 +478,43 @@ fn process_write_impl<S: Snapshot, L: LockManager>(
) -> Result<WriteResult> {
let (pr, to_be_write, rows, ctx, lock_info) = match cmd.kind {
CommandKind::Prewrite {
mutations,
mut mutations,
primary,
start_ts,
options,
mut options,
..
} => {
let mut txn = MvccTxn::new(snapshot, start_ts, !cmd.ctx.get_not_fill_cache())?;
let mut locks = vec![];
let mut scan_mode = None;
let rows = mutations.len();
if options.for_update_ts.is_zero() && rows > FORWARD_MIN_MUTATIONS_NUM {
mutations.sort_by(|a, b| a.key().cmp(b.key()));
let left_key = mutations.first().unwrap().key();
let right_key = mutations
.last()
.unwrap()
.key()
.clone()
.append_ts(TimeStamp::zero());
if !has_data_in_range(
snapshot.clone(),
CF_WRITE,
left_key,
&right_key,
&mut statistics.write,
)? {
// If there is no data in range, we could skip constraint check, and use Forward seek for CF_LOCK.
// Because in most instances, there won't be more than one transaction write the same key. Seek
// operation could skip nonexistent key in CF_LOCK.
options.skip_constraint_check = true;
scan_mode = Some(ScanMode::Forward)
}
}
let mut locks = vec![];
let mut txn = if scan_mode.is_some() {
MvccTxn::for_scan(snapshot, scan_mode, start_ts, !cmd.ctx.get_not_fill_cache())?
} else {
MvccTxn::new(snapshot, start_ts, !cmd.ctx.get_not_fill_cache())?
};

// If `options.for_update_ts` is 0, the transaction is optimistic
// or else pessimistic.
Expand Down Expand Up @@ -892,6 +923,8 @@ fn find_mvcc_infos_by_key<S: Snapshot>(
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::kv::{Snapshot, TestEngineBuilder};
use crate::storage::{DummyLockManager, Mutation, Options};

#[test]
fn test_extract_lock_from_result() {
Expand All @@ -909,4 +942,174 @@ mod tests {
assert_eq!(lock.ts, ts.into());
assert_eq!(lock.hash, key.gen_hash());
}

fn inner_test_prewrite_skip_constraint_check(pri_key_number: u8, write_num: usize) {
let mut mutations = Vec::default();
let pri_key = &[pri_key_number];
for i in 0..write_num {
mutations.push(Mutation::Insert((
Key::from_raw(&[i as u8]),
b"100".to_vec(),
)));
}
let mut statistic = Statistics::default();
let engine = TestEngineBuilder::new().build().unwrap();
prewrite(
&engine,
&mut statistic,
vec![Mutation::Put((
Key::from_raw(&[pri_key_number]),
b"100".to_vec(),
))],
pri_key.to_vec(),
99,
)
.unwrap();
assert_eq!(1, statistic.write.seek);
let e = prewrite(
&engine,
&mut statistic,
mutations.clone(),
pri_key.to_vec(),
100,
)
.err()
.unwrap();
assert_eq!(2, statistic.write.seek);
match e {
Error(box ErrorInner::Mvcc(MvccError(box MvccErrorInner::KeyIsLocked(_)))) => (),
_ => panic!("error type not match"),
}
commit(
&engine,
&mut statistic,
vec![Key::from_raw(&[pri_key_number])],
99,
102,
)
.unwrap();
assert_eq!(2, statistic.write.seek);
let e = prewrite(
&engine,
&mut statistic,
mutations.clone(),
pri_key.to_vec(),
101,
)
.err()
.unwrap();
match e {
Error(box ErrorInner::Mvcc(MvccError(box MvccErrorInner::WriteConflict {
..
}))) => (),
_ => panic!("error type not match"),
}
let e = prewrite(
&engine,
&mut statistic,
mutations.clone(),
pri_key.to_vec(),
104,
)
.err()
.unwrap();
match e {
Error(box ErrorInner::Mvcc(MvccError(box MvccErrorInner::AlreadyExist { .. }))) => (),
_ => panic!("error type not match"),
}

statistic.write.seek = 0;
let ctx = Context::default();
engine
.delete_cf(
&ctx,
CF_WRITE,
Key::from_raw(&[pri_key_number]).append_ts(102.into()),
)
.unwrap();
prewrite(
&engine,
&mut statistic,
mutations.clone(),
pri_key.to_vec(),
104,
)
.unwrap();
// All keys are prewrited successful with only one seek operations.
assert_eq!(1, statistic.write.seek);
let keys: Vec<Key> = mutations.iter().map(|m| m.key().clone()).collect();
commit(&engine, &mut statistic, keys.clone(), 104, 105).unwrap();
let snap = engine.snapshot(&ctx).unwrap();
for k in keys {
let v = snap.get_cf(CF_WRITE, &k.append_ts(105.into())).unwrap();
assert!(v.is_some());
}
}

#[test]
fn test_prewrite_skip_constraint_check() {
inner_test_prewrite_skip_constraint_check(0, FORWARD_MIN_MUTATIONS_NUM + 1);
inner_test_prewrite_skip_constraint_check(5, FORWARD_MIN_MUTATIONS_NUM + 1);
inner_test_prewrite_skip_constraint_check(
FORWARD_MIN_MUTATIONS_NUM as u8,
FORWARD_MIN_MUTATIONS_NUM + 1,
);
}

fn prewrite<E: Engine>(
engine: &E,
statistics: &mut Statistics,
mutations: Vec<Mutation>,
primary: Vec<u8>,
start_ts: u64,
) -> Result<()> {
let ctx = Context::default();
let snap = engine.snapshot(&ctx)?;
let cmd = Command {
ctx,
kind: CommandKind::Prewrite {
mutations,
primary,
start_ts: TimeStamp::from(start_ts),
options: Options::default(),
},
};
let m = DummyLockManager {};
let ret = process_write_impl(cmd, snap, Some(m), statistics)?;
if let ProcessResult::MultiRes { results } = ret.pr {
if !results.is_empty() {
let info = LockInfo::default();
return Err(Error::from(ErrorInner::Mvcc(MvccError::from(
MvccErrorInner::KeyIsLocked(info),
))));
}
}
let ctx = Context::default();
engine.write(&ctx, ret.to_be_write).unwrap();
Ok(())
}

fn commit<E: Engine>(
engine: &E,
statistics: &mut Statistics,
keys: Vec<Key>,
lock_ts: u64,
commit_ts: u64,
) -> Result<()> {
let ctx = Context::default();
let snap = engine.snapshot(&ctx)?;
let cmd = Command {
ctx,
kind: CommandKind::Commit {
keys,
lock_ts: TimeStamp::from(lock_ts),
commit_ts: TimeStamp::from(commit_ts),
},
};
let m = DummyLockManager {};
let ret = process_write_impl(cmd, snap, Some(m), statistics)?;
let ctx = Context::default();
engine.write(&ctx, ret.to_be_write).unwrap();
Ok(())
}
}

0 comments on commit 5015ede

Please sign in to comment.