Skip to content

Commit

Permalink
txn: Keep pessimistic lock's TTL if it's greater than prewrite… (#6056)…
Browse files Browse the repository at this point in the history
… (#6059)

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta authored and sre-bot committed Nov 29, 2019
1 parent f1aacba commit 9b87615
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 8 deletions.
39 changes: 39 additions & 0 deletions src/storage/mvcc/mod.rs
Expand Up @@ -307,6 +307,22 @@ pub mod tests {
must_prewrite_put_impl(engine, key, value, pk, ts, is_pessimistic_lock, options);
}

pub fn must_pessimistic_prewrite_put_with_ttl<E: Engine>(
engine: &E,
key: &[u8],
value: &[u8],
pk: &[u8],
ts: u64,
for_update_ts: u64,
is_pessimistic_lock: bool,
lock_ttl: u64,
) {
let mut options = Options::default();
options.for_update_ts = for_update_ts;
options.lock_ttl = lock_ttl;
must_prewrite_put_impl(engine, key, value, pk, ts, is_pessimistic_lock, options);
}

fn must_prewrite_put_err_impl<E: Engine>(
engine: &E,
key: &[u8],
Expand Down Expand Up @@ -483,6 +499,20 @@ pub mod tests {
}
}

pub fn must_acquire_pessimistic_lock_with_ttl<E: Engine>(
engine: &E,
key: &[u8],
pk: &[u8],
start_ts: u64,
for_update_ts: u64,
ttl: u64,
) {
let mut options = Options::default();
options.for_update_ts = for_update_ts;
options.lock_ttl = ttl;
must_acquire_pessimistic_lock_impl(engine, key, pk, start_ts, options);
}

pub fn must_acquire_pessimistic_lock_err<E: Engine>(
engine: &E,
key: &[u8],
Expand Down Expand Up @@ -618,6 +648,15 @@ pub mod tests {
assert_ne!(lock.lock_type, LockType::Pessimistic);
}

pub fn must_locked_with_ttl<E: Engine>(engine: &E, key: &[u8], start_ts: u64, ttl: u64) {
let snapshot = engine.snapshot(&Context::default()).unwrap();
let mut reader = MvccReader::new(snapshot, None, true, None, None, IsolationLevel::SI);
let lock = reader.load_lock(&Key::from_raw(key)).unwrap().unwrap();
assert_eq!(lock.ts, start_ts);
assert_ne!(lock.lock_type, LockType::Pessimistic);
assert_eq!(lock.ttl, ttl);
}

pub fn must_pessimistic_locked<E: Engine>(
engine: &E,
key: &[u8],
Expand Down
71 changes: 63 additions & 8 deletions src/storage/mvcc/txn.rs
Expand Up @@ -98,13 +98,14 @@ impl<S: Snapshot> MvccTxn<S> {
lock_type: LockType,
primary: Vec<u8>,
short_value: Option<Value>,
lock_ttl: u64,
options: &Options,
) {
let lock = Lock::new(
lock_type,
primary,
self.start_ts,
options.lock_ttl,
lock_ttl,
short_value,
options.for_update_ts,
options.txn_size,
Expand Down Expand Up @@ -151,21 +152,22 @@ impl<S: Snapshot> MvccTxn<S> {
lock_type: LockType,
primary: Vec<u8>,
value: Option<Value>,
lock_ttl: u64,
options: &Options,
) {
if let Some(value) = value {
if is_short_value(&value) {
// If the value is short, embed it in Lock.
self.lock_key(key, lock_type, primary, Some(value), options);
self.lock_key(key, lock_type, primary, Some(value), lock_ttl, options);
} else {
// value is long
let ts = self.start_ts;
self.put_value(key.clone(), ts, value);

self.lock_key(key, lock_type, primary, None, options);
self.lock_key(key, lock_type, primary, None, lock_ttl, options);
}
} else {
self.lock_key(key, lock_type, primary, None, options);
self.lock_key(key, lock_type, primary, None, lock_ttl, options);
}
}

Expand Down Expand Up @@ -252,7 +254,14 @@ impl<S: Snapshot> MvccTxn<S> {
}
// Overwrite the lock with small for_update_ts
if for_update_ts > lock.for_update_ts {
self.lock_key(key, LockType::Pessimistic, primary.to_vec(), None, options);
self.lock_key(
key,
LockType::Pessimistic,
primary.to_vec(),
None,
options.lock_ttl,
options,
);
} else {
MVCC_DUPLICATE_CMD_COUNTER_VEC
.acquire_pessimistic_lock
Expand Down Expand Up @@ -308,7 +317,14 @@ impl<S: Snapshot> MvccTxn<S> {
self.check_data_constraint(should_not_exist, &write, commit_ts, &key)?;
}

self.lock_key(key, LockType::Pessimistic, primary.to_vec(), None, options);
self.lock_key(
key,
LockType::Pessimistic,
primary.to_vec(),
None,
options.lock_ttl,
options,
);

Ok(())
}
Expand All @@ -327,6 +343,7 @@ impl<S: Snapshot> MvccTxn<S> {
Mutation::Lock(key) => (key, None),
Mutation::Insert((key, value)) => (key, Some(value)),
};
let mut last_lock_ttl = 0;

if let Some(lock) = self.reader.load_lock(&key)? {
if lock.ts != self.start_ts {
Expand All @@ -352,6 +369,7 @@ impl<S: Snapshot> MvccTxn<S> {
return Ok(());
}
// The lock is pessimistic and owned by this txn, go through to overwrite it.
last_lock_ttl = lock.ttl;
}
} else if is_pessimistic_lock {
// Pessimistic lock does not exist, the transaction should be aborted.
Expand All @@ -368,7 +386,14 @@ impl<S: Snapshot> MvccTxn<S> {
}

// No need to check data constraint, it's resolved by pessimistic locks.
self.prewrite_key_value(key, lock_type, primary.to_vec(), value, options);
self.prewrite_key_value(
key,
lock_type,
primary.to_vec(),
value,
::std::cmp::max(last_lock_ttl, options.lock_ttl),
options,
);
Ok(())
}

Expand Down Expand Up @@ -425,7 +450,14 @@ impl<S: Snapshot> MvccTxn<S> {
}
}

self.prewrite_key_value(key, lock_type, primary.to_vec(), value, options);
self.prewrite_key_value(
key,
lock_type,
primary.to_vec(),
value,
options.lock_ttl,
options,
);
Ok(())
}

Expand Down Expand Up @@ -1628,6 +1660,29 @@ mod tests {
must_get_rollback_ts(&engine, k, 70);
}

#[test]
fn test_pessimistic_txn_ttl() {
let engine = TestEngineBuilder::new().build().unwrap();

let (k, v) = (b"k", b"v");

// Pessimistic prewrite keeps the larger TTL of the prewrite request and the original
// pessimisitic lock.
must_acquire_pessimistic_lock_with_ttl(&engine, k, k, 10, 10, 100);
must_pessimistic_locked(&engine, k, 10, 10);
must_pessimistic_prewrite_put_with_ttl(&engine, k, v, k, 10, 10, true, 110);
must_locked_with_ttl(&engine, k, 10, 110);

must_rollback(&engine, k, 10);

// TTL not changed if the pessimistic lock's TTL is larger than that provided in the
// prewrite request.
must_acquire_pessimistic_lock_with_ttl(&engine, k, k, 20, 20, 100);
must_pessimistic_locked(&engine, k, 20, 20);
must_pessimistic_prewrite_put_with_ttl(&engine, k, v, k, 20, 20, true, 90);
must_locked_with_ttl(&engine, k, 20, 100);
}

#[test]
fn test_non_pessimistic_lock_conflict() {
let engine = TestEngineBuilder::new().build().unwrap();
Expand Down

0 comments on commit 9b87615

Please sign in to comment.