Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: Keep pessimistic lock's TTL if it's greater than prewrite reques… #6056

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/storage/mvcc/mod.rs
Expand Up @@ -308,6 +308,22 @@ pub mod tests {
must_prewrite_put_impl(engine, key, value, pk, ts, is_pessimistic_lock, options);
}

pub fn must_pessimistic_prewrite_put_with_ttl<E: Engine>(
engine: &E,
key: &[u8],
value: &[u8],
pk: &[u8],
ts: u64,
for_update_ts: u64,
is_pessimistic_lock: bool,
lock_ttl: u64,
) {
let mut options = Options::default();
options.for_update_ts = for_update_ts;
options.lock_ttl = lock_ttl;
must_prewrite_put_impl(engine, key, value, pk, ts, is_pessimistic_lock, options);
}

fn must_prewrite_put_err_impl<E: Engine>(
engine: &E,
key: &[u8],
Expand Down Expand Up @@ -484,6 +500,20 @@ pub mod tests {
}
}

pub fn must_acquire_pessimistic_lock_with_ttl<E: Engine>(
engine: &E,
key: &[u8],
pk: &[u8],
start_ts: u64,
for_update_ts: u64,
ttl: u64,
) {
let mut options = Options::default();
options.for_update_ts = for_update_ts;
options.lock_ttl = ttl;
must_acquire_pessimistic_lock_impl(engine, key, pk, start_ts, options);
}

pub fn must_acquire_pessimistic_lock_err<E: Engine>(
engine: &E,
key: &[u8],
Expand Down Expand Up @@ -619,6 +649,15 @@ pub mod tests {
assert_ne!(lock.lock_type, LockType::Pessimistic);
}

pub fn must_locked_with_ttl<E: Engine>(engine: &E, key: &[u8], start_ts: u64, ttl: u64) {
let snapshot = engine.snapshot(&Context::default()).unwrap();
let mut reader = MvccReader::new(snapshot, None, true, None, None, IsolationLevel::SI);
let lock = reader.load_lock(&Key::from_raw(key)).unwrap().unwrap();
assert_eq!(lock.ts, start_ts);
assert_ne!(lock.lock_type, LockType::Pessimistic);
assert_eq!(lock.ttl, ttl);
}

pub fn must_pessimistic_locked<E: Engine>(
engine: &E,
key: &[u8],
Expand Down
71 changes: 63 additions & 8 deletions src/storage/mvcc/txn.rs
Expand Up @@ -98,13 +98,14 @@ impl<S: Snapshot> MvccTxn<S> {
lock_type: LockType,
primary: Vec<u8>,
short_value: Option<Value>,
lock_ttl: u64,
options: &Options,
) {
let lock = Lock::new(
lock_type,
primary,
self.start_ts,
options.lock_ttl,
lock_ttl,
short_value,
options.for_update_ts,
options.txn_size,
Expand Down Expand Up @@ -151,21 +152,22 @@ impl<S: Snapshot> MvccTxn<S> {
lock_type: LockType,
primary: Vec<u8>,
value: Option<Value>,
lock_ttl: u64,
options: &Options,
) {
if let Some(value) = value {
if is_short_value(&value) {
// If the value is short, embed it in Lock.
self.lock_key(key, lock_type, primary, Some(value), options);
self.lock_key(key, lock_type, primary, Some(value), lock_ttl, options);
} else {
// value is long
let ts = self.start_ts;
self.put_value(key.clone(), ts, value);

self.lock_key(key, lock_type, primary, None, options);
self.lock_key(key, lock_type, primary, None, lock_ttl, options);
}
} else {
self.lock_key(key, lock_type, primary, None, options);
self.lock_key(key, lock_type, primary, None, lock_ttl, options);
}
}

Expand Down Expand Up @@ -252,7 +254,14 @@ impl<S: Snapshot> MvccTxn<S> {
}
// Overwrite the lock with small for_update_ts
if for_update_ts > lock.for_update_ts {
self.lock_key(key, LockType::Pessimistic, primary.to_vec(), None, options);
self.lock_key(
key,
LockType::Pessimistic,
primary.to_vec(),
None,
options.lock_ttl,
options,
);
} else {
MVCC_DUPLICATE_CMD_COUNTER_VEC
.acquire_pessimistic_lock
Expand Down Expand Up @@ -308,7 +317,14 @@ impl<S: Snapshot> MvccTxn<S> {
self.check_data_constraint(should_not_exist, &write, commit_ts, &key)?;
}

self.lock_key(key, LockType::Pessimistic, primary.to_vec(), None, options);
self.lock_key(
key,
LockType::Pessimistic,
primary.to_vec(),
None,
options.lock_ttl,
options,
);

Ok(())
}
Expand All @@ -327,6 +343,7 @@ impl<S: Snapshot> MvccTxn<S> {
Mutation::Lock(key) => (key, None),
Mutation::Insert((key, value)) => (key, Some(value)),
};
let mut last_lock_ttl = 0;

if let Some(lock) = self.reader.load_lock(&key)? {
if lock.ts != self.start_ts {
Expand All @@ -352,6 +369,7 @@ impl<S: Snapshot> MvccTxn<S> {
return Ok(());
}
// The lock is pessimistic and owned by this txn, go through to overwrite it.
last_lock_ttl = lock.ttl;
}
} else if is_pessimistic_lock {
// Pessimistic lock does not exist, the transaction should be aborted.
Expand All @@ -368,7 +386,14 @@ impl<S: Snapshot> MvccTxn<S> {
}

// No need to check data constraint, it's resolved by pessimistic locks.
self.prewrite_key_value(key, lock_type, primary.to_vec(), value, options);
self.prewrite_key_value(
key,
lock_type,
primary.to_vec(),
value,
::std::cmp::max(last_lock_ttl, options.lock_ttl),
options,
);
Ok(())
}

Expand Down Expand Up @@ -425,7 +450,14 @@ impl<S: Snapshot> MvccTxn<S> {
}
}

self.prewrite_key_value(key, lock_type, primary.to_vec(), value, options);
self.prewrite_key_value(
key,
lock_type,
primary.to_vec(),
value,
options.lock_ttl,
options,
);
Ok(())
}

Expand Down Expand Up @@ -1628,6 +1660,29 @@ mod tests {
must_get_rollback_ts(&engine, k, 70);
}

#[test]
fn test_pessimistic_txn_ttl() {
let engine = TestEngineBuilder::new().build().unwrap();

let (k, v) = (b"k", b"v");

// Pessimistic prewrite keeps the larger TTL of the prewrite request and the original
// pessimisitic lock.
must_acquire_pessimistic_lock_with_ttl(&engine, k, k, 10, 10, 100);
must_pessimistic_locked(&engine, k, 10, 10);
must_pessimistic_prewrite_put_with_ttl(&engine, k, v, k, 10, 10, true, 110);
must_locked_with_ttl(&engine, k, 10, 110);

must_rollback(&engine, k, 10);

// TTL not changed if the pessimistic lock's TTL is larger than that provided in the
// prewrite request.
must_acquire_pessimistic_lock_with_ttl(&engine, k, k, 20, 20, 100);
must_pessimistic_locked(&engine, k, 20, 20);
must_pessimistic_prewrite_put_with_ttl(&engine, k, v, k, 20, 20, true, 90);
must_locked_with_ttl(&engine, k, 20, 100);
}

#[test]
fn test_non_pessimistic_lock_conflict() {
let engine = TestEngineBuilder::new().build().unwrap();
Expand Down