Skip to content

Commit

Permalink
prewrite only when keys are not exist (tikv#4085)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjinpeng87 authored and dcalvin committed Feb 22, 2019
1 parent e62ebab commit 72a10a1
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 22 deletions.
82 changes: 81 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions src/server/service/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,7 @@ fn future_prewrite<E: Engine>(
Op::Put => Mutation::Put((Key::from_raw(x.get_key()), x.take_value())),
Op::Del => Mutation::Delete(Key::from_raw(x.get_key())),
Op::Lock => Mutation::Lock(Key::from_raw(x.get_key())),
Op::Insert => Mutation::Insert((Key::from_raw(x.get_key()), x.take_value())),
_ => panic!("mismatch Op in prewrite mutations"),
})
.collect();
Expand Down Expand Up @@ -1614,6 +1615,11 @@ fn extract_key_error(err: &storage::Error) -> KeyError {
// for compatibility with older versions.
key_error.set_retryable(format!("{:?}", err));
}
storage::Error::Txn(TxnError::Mvcc(MvccError::AlreadyExist { ref key })) => {
let mut exist = AlreadyExist::new();
exist.set_key(key.clone());
key_error.set_already_exist(exist);
}
// failed in commit
storage::Error::Txn(TxnError::Mvcc(MvccError::TxnLockNotFound { .. })) => {
warn!("txn conflicts: {:?}", err);
Expand Down
5 changes: 4 additions & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub enum Mutation {
Put((Key, Value)),
Delete(Key),
Lock(Key),
Insert((Key, Value)), // has a constraint that key should not exist.
}

#[allow(clippy::match_same_arms)]
Expand All @@ -92,6 +93,7 @@ impl Mutation {
Mutation::Put((ref key, _)) => key,
Mutation::Delete(ref key) => key,
Mutation::Lock(ref key) => key,
Mutation::Insert((ref key, _)) => key,
}
}
}
Expand Down Expand Up @@ -357,7 +359,8 @@ impl Command {
Command::Prewrite { ref mutations, .. } => {
for m in mutations {
match *m {
Mutation::Put((ref key, ref value)) => {
Mutation::Put((ref key, ref value))
| Mutation::Insert((ref key, ref value)) => {
bytes += key.as_encoded().len();
bytes += value.len();
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mvcc/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const FLAG_LOCK: u8 = b'L';
impl LockType {
pub fn from_mutation(mutation: &Mutation) -> LockType {
match *mutation {
Mutation::Put(_) => LockType::Put,
Mutation::Put(_) | Mutation::Insert(_) => LockType::Put,
Mutation::Delete(_) => LockType::Delete,
Mutation::Lock(_) => LockType::Lock,
}
Expand Down
25 changes: 25 additions & 0 deletions src/storage/mvcc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ quick_error! {
display("write conflict, start_ts:{}, conflict_start_ts:{}, conflict_commit_ts:{}, key:{:?}, primary:{:?}",
start_ts, conflict_start_ts, conflict_commit_ts, escape(key), escape(primary))
}
AlreadyExist { key: Vec<u8> } {
description("already exists")
display("key {:?} already exists", escape(key))
}
KeyVersion {description("bad format key(version)")}
Other(err: Box<error::Error + Sync + Send>) {
from()
Expand Down Expand Up @@ -118,6 +122,7 @@ impl Error {
key: key.to_owned(),
primary: primary.to_owned(),
}),
Error::AlreadyExist { ref key } => Some(Error::AlreadyExist { key: key.clone() }),
Error::KeyVersion => Some(Error::KeyVersion),
Error::Committed { commit_ts } => Some(Error::Committed { commit_ts }),
Error::Io(_) | Error::Other(_) => None,
Expand Down Expand Up @@ -176,6 +181,26 @@ pub mod tests {
assert!(reader.get(&Key::from_raw(key), ts).is_err());
}

// Insert has a constraint that key should not exist
pub fn try_prewrite_insert<E: Engine>(
engine: &E,
key: &[u8],
value: &[u8],
pk: &[u8],
ts: u64,
) -> Result<()> {
let ctx = Context::new();
let snapshot = engine.snapshot(&ctx).unwrap();
let mut txn = MvccTxn::new(snapshot, ts, true).unwrap();
txn.prewrite(
Mutation::Insert((Key::from_raw(key), value.to_vec())),
pk,
&Options::default(),
)?;
write(engine, &ctx, txn.into_modifies());
Ok(())
}

pub fn must_prewrite_put<E: Engine>(engine: &E, key: &[u8], value: &[u8], pk: &[u8], ts: u64) {
let ctx = Context::new();
let snapshot = engine.snapshot(&ctx).unwrap();
Expand Down
79 changes: 71 additions & 8 deletions src/storage/mvcc/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,17 +234,24 @@ impl<S: Snapshot> MvccReader<S> {
IsolationLevel::SI => ts = self.check_lock(key, ts)?,
IsolationLevel::RC => {}
}
if let Some(mut write) = self.get_write(key, ts)? {
if write.short_value.is_some() {
if self.key_only {
return Ok(Some(vec![]));
}
return Ok(write.short_value.take());
}
return self.load_data(key, write.start_ts).map(Some);
}
Ok(None)
}

pub fn get_write(&mut self, key: &Key, mut ts: u64) -> Result<Option<Write>> {
loop {
match self.seek_write(key, ts)? {
Some((commit_ts, mut write)) => match write.write_type {
Some((commit_ts, write)) => match write.write_type {
WriteType::Put => {
if write.short_value.is_some() {
if self.key_only {
return Ok(Some(vec![]));
}
return Ok(write.short_value.take());
}
return self.load_data(key, write.start_ts).map(Some);
return Ok(Some(write));
}
WriteType::Delete => {
return Ok(None);
Expand Down Expand Up @@ -812,4 +819,60 @@ mod tests {
// `get_txn_commit_info(&key, 15)` stopped at `30_25 PUT`.
assert_eq!(seek_for_prev_new - seek_for_prev_old, 2);
}

#[test]
fn test_get_write() {
let path = TempDir::new("_test_storage_mvcc_reader_get_write").expect("");
let path = path.path().to_str().unwrap();
let region = make_region(1, vec![], vec![]);
let db = open_db(path, true);
let mut engine = RegionEngine::new(Arc::clone(&db), region.clone());

let (k, v) = (b"k", b"v");
let m = Mutation::Put((Key::from_raw(k), v.to_vec()));
engine.prewrite(m, k, 1);
engine.commit(k, 1, 2);

engine.rollback(k, 5);

engine.lock(k, 6, 7);

engine.delete(k, 8, 9);

let m = Mutation::Put((Key::from_raw(k), v.to_vec()));
engine.prewrite(m, k, 10);
engine.commit(k, 10, 11);

let m = Mutation::Put((Key::from_raw(k), v.to_vec()));
engine.prewrite(m, k, 12);

let snap = RegionSnapshot::from_raw(Arc::clone(&db), region.clone());
let mut reader = MvccReader::new(snap, None, false, None, None, IsolationLevel::SI);

// Let's assume `2_1 PUT` means a commit version with start ts is 1 and commit ts
// is 2.
// Commit versions: [11_10 PUT, 9_8 DELETE, 7_6 LOCK, 5_5 Rollback, 2_1 PUT].
let key = Key::from_raw(k);
let write = reader.get_write(&key, 2).unwrap().unwrap();
assert_eq!(write.write_type, WriteType::Put);
assert_eq!(write.start_ts, 1);

let write = reader.get_write(&key, 5).unwrap().unwrap();
assert_eq!(write.write_type, WriteType::Put);
assert_eq!(write.start_ts, 1);

let write = reader.get_write(&key, 7).unwrap().unwrap();
assert_eq!(write.write_type, WriteType::Put);
assert_eq!(write.start_ts, 1);

assert!(reader.get_write(&key, 9).unwrap().is_none());

let write = reader.get_write(&key, 11).unwrap().unwrap();
assert_eq!(write.write_type, WriteType::Put);
assert_eq!(write.start_ts, 10);

let write = reader.get_write(&key, 13).unwrap().unwrap();
assert_eq!(write.write_type, WriteType::Put);
assert_eq!(write.start_ts, 10);
}
}
Loading

0 comments on commit 72a10a1

Please sign in to comment.