diff --git a/src/server/gc_worker.rs b/src/server/gc_worker.rs index c53536c09b16..f7c949397fcf 100644 --- a/src/server/gc_worker.rs +++ b/src/server/gc_worker.rs @@ -263,7 +263,13 @@ impl GCRunner { mut next_scan_key: Option, ) -> Result> { let snapshot = self.get_snapshot(ctx)?; - let mut txn = MvccTxn::new(snapshot, TimeStamp::zero(), !ctx.get_not_fill_cache()).unwrap(); + let mut txn = MvccTxn::for_scan( + snapshot, + Some(ScanMode::Forward), + TimeStamp::zero(), + !ctx.get_not_fill_cache(), + ) + .unwrap(); for k in keys { let gc_info = txn.gc(k.clone(), safe_point)?; diff --git a/src/storage/mvcc/mod.rs b/src/storage/mvcc/mod.rs index 38d9aada35ad..562da9e5b288 100644 --- a/src/storage/mvcc/mod.rs +++ b/src/storage/mvcc/mod.rs @@ -896,7 +896,8 @@ pub mod tests { pub fn must_gc(engine: &E, key: &[u8], safe_point: impl Into) { let ctx = Context::default(); let snapshot = engine.snapshot(&ctx).unwrap(); - let mut txn = MvccTxn::new(snapshot, TimeStamp::zero(), true).unwrap(); + let mut txn = + MvccTxn::for_scan(snapshot, Some(ScanMode::Forward), TimeStamp::zero(), true).unwrap(); txn.gc(Key::from_raw(key), safe_point.into()).unwrap(); write(engine, &ctx, txn.into_modifies()); } diff --git a/src/storage/mvcc/reader/mod.rs b/src/storage/mvcc/reader/mod.rs index a063a4c9530f..c6dcc2425be3 100644 --- a/src/storage/mvcc/reader/mod.rs +++ b/src/storage/mvcc/reader/mod.rs @@ -7,4 +7,4 @@ mod scanner; pub use self::point_getter::{PointGetter, PointGetterBuilder}; pub use self::reader::MvccReader; pub use self::scanner::EntryScanner; -pub use self::scanner::{Scanner, ScannerBuilder}; +pub use self::scanner::{has_data_in_range, Scanner, ScannerBuilder}; diff --git a/src/storage/mvcc/reader/scanner/mod.rs b/src/storage/mvcc/reader/scanner/mod.rs index 363157e8e6bc..a963b660b3bf 100644 --- a/src/storage/mvcc/reader/scanner/mod.rs +++ b/src/storage/mvcc/reader/scanner/mod.rs @@ -4,7 +4,7 @@ mod backward; mod forward; mod txn_entry; -use engine::{CfName, CF_DEFAULT, CF_LOCK, CF_WRITE}; +use engine::{CfName, IterOption, CF_DEFAULT, CF_LOCK, CF_WRITE}; use kvproto::kvrpcpb::IsolationLevel; use self::backward::BackwardScanner; @@ -12,8 +12,8 @@ use self::forward::ForwardScanner; use crate::storage::mvcc::{default_not_found_error, Result, TimeStamp, TsSet}; use crate::storage::txn::Result as TxnResult; use crate::storage::{ - Cursor, CursorBuilder, Iterator, Key, ScanMode, Scanner as StoreScanner, Snapshot, Statistics, - Value, + CFStatistics, Cursor, CursorBuilder, Iterator, Key, ScanMode, Scanner as StoreScanner, + Snapshot, Statistics, Value, }; pub use self::txn_entry::Scanner as EntryScanner; @@ -254,6 +254,23 @@ where Ok(default_cursor.value(&mut statistics.data).to_vec()) } +pub fn has_data_in_range( + snapshot: S, + cf: CfName, + left: &Key, + right: &Key, + statistic: &mut CFStatistics, +) -> Result { + let iter_opt = IterOption::new(None, None, true); + let mut iter = snapshot.iter_cf(cf, iter_opt, ScanMode::Forward)?; + if iter.seek(left, statistic)? { + if iter.key(statistic) < right.as_encoded().as_slice() { + return Ok(true); + } + } + Ok(false) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/storage/mvcc/txn.rs b/src/storage/mvcc/txn.rs index f5b42aef53e8..a04a9e8a8baf 100644 --- a/src/storage/mvcc/txn.rs +++ b/src/storage/mvcc/txn.rs @@ -23,7 +23,6 @@ pub struct GcInfo { pub struct MvccTxn { reader: MvccReader, - gc_reader: MvccReader, start_ts: TimeStamp, writes: Vec, write_size: usize, @@ -48,12 +47,24 @@ impl MvccTxn { // IsolationLevel is `Si`, actually the method we use in MvccTxn does not rely on // isolation level, so it can be any value. reader: MvccReader::new(snapshot.clone(), None, fill_cache, IsolationLevel::Si), - gc_reader: MvccReader::new( - snapshot, - Some(ScanMode::Forward), - fill_cache, - IsolationLevel::Si, - ), + start_ts, + writes: vec![], + write_size: 0, + collapse_rollback: true, + }) + } + pub fn for_scan( + snapshot: S, + scan_mode: Option, + start_ts: TimeStamp, + fill_cache: bool, + ) -> Result { + Ok(Self { + // Use `ScanMode::Forward` when gc or prewrite with multiple `Mutation::Insert`, + // which would seek less times. + // When `scan_mode` is `Some(ScanMode::Forward)`, all keys must be writte by + // in ascending order. + reader: MvccReader::new(snapshot.clone(), scan_mode, fill_cache, IsolationLevel::Si), start_ts, writes: vec![], write_size: 0, @@ -751,7 +762,7 @@ impl MvccTxn { let mut deleted_versions = 0; let mut latest_delete = None; let mut is_completed = true; - while let Some((commit, write)) = self.gc_reader.seek_write(&key, ts)? { + while let Some((commit, write)) = self.reader.seek_write(&key, ts)? { ts = commit.prev(); found_versions += 1; diff --git a/src/storage/txn/process.rs b/src/storage/txn/process.rs index 21f93ca7a6c1..16e50c2c32bf 100644 --- a/src/storage/txn/process.rs +++ b/src/storage/txn/process.rs @@ -12,8 +12,8 @@ use crate::storage::kv::with_tls_engine; use crate::storage::kv::{CbContext, Modify, Result as EngineResult}; use crate::storage::lock_manager::{self, Lock, LockManager}; use crate::storage::mvcc::{ - Error as MvccError, ErrorInner as MvccErrorInner, Lock as MvccLock, MvccReader, MvccTxn, - TimeStamp, Write, MAX_TXN_WRITE_SIZE, + has_data_in_range, Error as MvccError, ErrorInner as MvccErrorInner, Lock as MvccLock, + MvccReader, MvccTxn, TimeStamp, Write, MAX_TXN_WRITE_SIZE, }; use crate::storage::txn::{sched_pool::*, scheduler::Msg, Error, ErrorInner, Result}; use crate::storage::types::ProcessResult; @@ -22,9 +22,12 @@ use crate::storage::{ Command, CommandKind, Engine, Error as StorageError, ErrorInner as StorageErrorInner, Key, MvccInfo, Result as StorageResult, ScanMode, Snapshot, Statistics, TxnStatus, Value, }; +use engine::CF_WRITE; use tikv_util::collections::HashMap; use tikv_util::time::{Instant, SlowTimer}; +pub const FORWARD_MIN_MUTATIONS_NUM: usize = 12; + // To resolve a key, the write size is about 100~150 bytes, depending on key and value length. // The write batch will be around 32KB if we scan 256 keys each time. pub const RESOLVE_LOCK_BATCH_SIZE: usize = 256; @@ -475,15 +478,43 @@ fn process_write_impl( ) -> Result { let (pr, to_be_write, rows, ctx, lock_info) = match cmd.kind { CommandKind::Prewrite { - mutations, + mut mutations, primary, start_ts, - options, + mut options, .. } => { - let mut txn = MvccTxn::new(snapshot, start_ts, !cmd.ctx.get_not_fill_cache())?; - let mut locks = vec![]; + let mut scan_mode = None; let rows = mutations.len(); + if options.for_update_ts.is_zero() && rows > FORWARD_MIN_MUTATIONS_NUM { + mutations.sort_by(|a, b| a.key().cmp(b.key())); + let left_key = mutations.first().unwrap().key(); + let right_key = mutations + .last() + .unwrap() + .key() + .clone() + .append_ts(TimeStamp::zero()); + if !has_data_in_range( + snapshot.clone(), + CF_WRITE, + left_key, + &right_key, + &mut statistics.write, + )? { + // If there is no data in range, we could skip constraint check, and use Forward seek for CF_LOCK. + // Because in most instances, there won't be more than one transaction write the same key. Seek + // operation could skip nonexistent key in CF_LOCK. + options.skip_constraint_check = true; + scan_mode = Some(ScanMode::Forward) + } + } + let mut locks = vec![]; + let mut txn = if scan_mode.is_some() { + MvccTxn::for_scan(snapshot, scan_mode, start_ts, !cmd.ctx.get_not_fill_cache())? + } else { + MvccTxn::new(snapshot, start_ts, !cmd.ctx.get_not_fill_cache())? + }; // If `options.for_update_ts` is 0, the transaction is optimistic // or else pessimistic. @@ -892,6 +923,8 @@ fn find_mvcc_infos_by_key( #[cfg(test)] mod tests { use super::*; + use crate::storage::kv::{Snapshot, TestEngineBuilder}; + use crate::storage::{DummyLockManager, Mutation, Options}; #[test] fn test_extract_lock_from_result() { @@ -909,4 +942,174 @@ mod tests { assert_eq!(lock.ts, ts.into()); assert_eq!(lock.hash, key.gen_hash()); } + + fn inner_test_prewrite_skip_constraint_check(pri_key_number: u8, write_num: usize) { + let mut mutations = Vec::default(); + let pri_key = &[pri_key_number]; + for i in 0..write_num { + mutations.push(Mutation::Insert(( + Key::from_raw(&[i as u8]), + b"100".to_vec(), + ))); + } + let mut statistic = Statistics::default(); + let engine = TestEngineBuilder::new().build().unwrap(); + prewrite( + &engine, + &mut statistic, + vec![Mutation::Put(( + Key::from_raw(&[pri_key_number]), + b"100".to_vec(), + ))], + pri_key.to_vec(), + 99, + ) + .unwrap(); + assert_eq!(1, statistic.write.seek); + let e = prewrite( + &engine, + &mut statistic, + mutations.clone(), + pri_key.to_vec(), + 100, + ) + .err() + .unwrap(); + assert_eq!(2, statistic.write.seek); + match e { + Error(box ErrorInner::Mvcc(MvccError(box MvccErrorInner::KeyIsLocked(_)))) => (), + _ => panic!("error type not match"), + } + commit( + &engine, + &mut statistic, + vec![Key::from_raw(&[pri_key_number])], + 99, + 102, + ) + .unwrap(); + assert_eq!(2, statistic.write.seek); + let e = prewrite( + &engine, + &mut statistic, + mutations.clone(), + pri_key.to_vec(), + 101, + ) + .err() + .unwrap(); + match e { + Error(box ErrorInner::Mvcc(MvccError(box MvccErrorInner::WriteConflict { + .. + }))) => (), + _ => panic!("error type not match"), + } + let e = prewrite( + &engine, + &mut statistic, + mutations.clone(), + pri_key.to_vec(), + 104, + ) + .err() + .unwrap(); + match e { + Error(box ErrorInner::Mvcc(MvccError(box MvccErrorInner::AlreadyExist { .. }))) => (), + _ => panic!("error type not match"), + } + + statistic.write.seek = 0; + let ctx = Context::default(); + engine + .delete_cf( + &ctx, + CF_WRITE, + Key::from_raw(&[pri_key_number]).append_ts(102.into()), + ) + .unwrap(); + prewrite( + &engine, + &mut statistic, + mutations.clone(), + pri_key.to_vec(), + 104, + ) + .unwrap(); + // All keys are prewrited successful with only one seek operations. + assert_eq!(1, statistic.write.seek); + let keys: Vec = mutations.iter().map(|m| m.key().clone()).collect(); + commit(&engine, &mut statistic, keys.clone(), 104, 105).unwrap(); + let snap = engine.snapshot(&ctx).unwrap(); + for k in keys { + let v = snap.get_cf(CF_WRITE, &k.append_ts(105.into())).unwrap(); + assert!(v.is_some()); + } + } + + #[test] + fn test_prewrite_skip_constraint_check() { + inner_test_prewrite_skip_constraint_check(0, FORWARD_MIN_MUTATIONS_NUM + 1); + inner_test_prewrite_skip_constraint_check(5, FORWARD_MIN_MUTATIONS_NUM + 1); + inner_test_prewrite_skip_constraint_check( + FORWARD_MIN_MUTATIONS_NUM as u8, + FORWARD_MIN_MUTATIONS_NUM + 1, + ); + } + + fn prewrite( + engine: &E, + statistics: &mut Statistics, + mutations: Vec, + primary: Vec, + start_ts: u64, + ) -> Result<()> { + let ctx = Context::default(); + let snap = engine.snapshot(&ctx)?; + let cmd = Command { + ctx, + kind: CommandKind::Prewrite { + mutations, + primary, + start_ts: TimeStamp::from(start_ts), + options: Options::default(), + }, + }; + let m = DummyLockManager {}; + let ret = process_write_impl(cmd, snap, Some(m), statistics)?; + if let ProcessResult::MultiRes { results } = ret.pr { + if !results.is_empty() { + let info = LockInfo::default(); + return Err(Error::from(ErrorInner::Mvcc(MvccError::from( + MvccErrorInner::KeyIsLocked(info), + )))); + } + } + let ctx = Context::default(); + engine.write(&ctx, ret.to_be_write).unwrap(); + Ok(()) + } + + fn commit( + engine: &E, + statistics: &mut Statistics, + keys: Vec, + lock_ts: u64, + commit_ts: u64, + ) -> Result<()> { + let ctx = Context::default(); + let snap = engine.snapshot(&ctx)?; + let cmd = Command { + ctx, + kind: CommandKind::Commit { + keys, + lock_ts: TimeStamp::from(lock_ts), + commit_ts: TimeStamp::from(commit_ts), + }, + }; + let m = DummyLockManager {}; + let ret = process_write_impl(cmd, snap, Some(m), statistics)?; + let ctx = Context::default(); + engine.write(&ctx, ret.to_be_write).unwrap(); + Ok(()) + } }