Skip to content

Commit

Permalink
txn: fix min_commit_ts calculation in prewrite (tikv#8672)
Browse files Browse the repository at this point in the history
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
  • Loading branch information
sticnarf committed Sep 16, 2020
1 parent c94cefa commit 8b1b2e6
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 2 deletions.
78 changes: 78 additions & 0 deletions src/storage/mod.rs
Expand Up @@ -5412,4 +5412,82 @@ mod tests {
let key_error = extract_key_error(&res[1].as_ref().unwrap_err());
assert_eq!(key_error.get_locked().get_key(), b"key");
}

#[test]
fn test_async_commit_prewrite() {
let storage = TestStorageBuilder::new(DummyLockManager {})
.build()
.unwrap();
let cm = storage.concurrency_manager.clone();
cm.update_max_ts(10.into());

// Optimistic prewrite
let (tx, rx) = channel();
storage
.sched_txn_command(
commands::Prewrite::new(
vec![
Mutation::Put((Key::from_raw(b"a"), b"v".to_vec())),
Mutation::Put((Key::from_raw(b"b"), b"v".to_vec())),
Mutation::Put((Key::from_raw(b"c"), b"v".to_vec())),
],
b"c".to_vec(),
100.into(),
1000,
false,
3,
TimeStamp::default(),
Some(vec![b"a".to_vec(), b"b".to_vec()]),
Context::default(),
),
Box::new(move |res| {
tx.send(res).unwrap();
}),
)
.unwrap();
let res = rx.recv().unwrap().unwrap();
assert_eq!(res.min_commit_ts, 101.into());

// Pessimistic prewrite
let (tx, rx) = channel();
storage
.sched_txn_command(
new_acquire_pessimistic_lock_command(
vec![(Key::from_raw(b"d"), false), (Key::from_raw(b"e"), false)],
200,
300,
false,
),
expect_ok_callback(tx, 0),
)
.unwrap();
rx.recv().unwrap();

cm.update_max_ts(1000.into());

let (tx, rx) = channel();
storage
.sched_txn_command(
commands::PrewritePessimistic::new(
vec![
(Mutation::Put((Key::from_raw(b"d"), b"v".to_vec())), true),
(Mutation::Put((Key::from_raw(b"e"), b"v".to_vec())), true),
],
b"d".to_vec(),
200.into(),
1000,
400.into(),
2,
401.into(),
Some(vec![b"e".to_vec()]),
Context::default(),
),
Box::new(move |res| {
tx.send(res).unwrap();
}),
)
.unwrap();
let res = rx.recv().unwrap().unwrap();
assert_eq!(res.min_commit_ts, 1001.into());
}
}
2 changes: 1 addition & 1 deletion src/storage/txn/commands/prewrite.rs
Expand Up @@ -205,7 +205,7 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for Prewrite {
self.min_commit_ts,
) {
Ok(ts) => {
if secondaries.is_some() && async_commit_ts > ts {
if secondaries.is_some() && async_commit_ts < ts {
async_commit_ts = ts;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/txn/commands/prewrite_pessimistic.rs
Expand Up @@ -121,7 +121,7 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for PrewritePessimistic {
context.pipelined_pessimistic_lock,
) {
Ok(ts) => {
if secondaries.is_some() && async_commit_ts > ts {
if secondaries.is_some() && async_commit_ts < ts {
async_commit_ts = ts;
}
}
Expand Down

0 comments on commit 8b1b2e6

Please sign in to comment.