Skip to content

Commit

Permalink
add test for async commit
Browse files Browse the repository at this point in the history
Signed-off-by: CalvinNeo <calvinneo1995@gmail.com>
  • Loading branch information
CalvinNeo committed Apr 19, 2024
1 parent 7913d31 commit 15712b8
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 0 deletions.
107 changes: 107 additions & 0 deletions proxy_tests/proxy/shared/replica_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ fn test_raft_cmd_request_cant_advanve_max_ts() {
fail::remove("on_pre_write_apply_state")
}

// https://github.com/tikv/tikv/pull/8669/files
#[test]
fn test_raft_cmd_request_learner_advanve_max_ts() {
use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse};
Expand Down Expand Up @@ -639,3 +640,109 @@ fn test_raft_message_can_advanve_max_ts() {
cluster.shutdown();
fail::remove("on_pre_write_apply_state")
}

#[test]
fn test_concurrent_update_maxts_and_commit() {
use kvproto::{
kvrpcpb::{Mutation, Op},
raft_cmdpb::{ReadIndexRequest, ReadIndexResponse},
};
use test_raftstore::{
must_kv_commit, must_kv_prewrite, must_kv_prewrite_with, must_kv_read_equal, new_mutation,
};
let mut cluster = new_server_cluster(0, 1);
cluster.run();

let cm = cluster.sim.read().unwrap().get_concurrency_manager(1);

let region = cluster.get_region(b"");
let leader = region.get_peers()[0].clone();
let follower = new_learner_peer(2, 2);
let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned();

let env = Arc::new(Environment::new(1));
let channel = ChannelBuilder::new(env).connect(&addr);
let client = TikvClient::new(channel);

let region_id = leader.get_id();

let mut ctx = Context::default();
ctx.set_region_id(region_id);
ctx.set_peer(leader.clone());
ctx.set_region_epoch(region.get_region_epoch().clone());

let read_index = |ranges: &[(&[u8], &[u8])], start_ts: u64| {
let mut m = raft::eraftpb::Message::default();
m.set_msg_type(MessageType::MsgReadIndex);
let mut read_index_req = ReadIndexRequest::default();
read_index_req.set_start_ts(start_ts);
for &(start_key, end_key) in ranges {
let mut range = KeyRange::default();
range.set_start_key(start_key.to_vec());
range.set_end_key(end_key.to_vec());
read_index_req.mut_key_ranges().push(range);
}

let rctx = ReadIndexContext {
id: Uuid::new_v4(),
request: Some(read_index_req),
locked: None,
};
let mut e = raft::eraftpb::Entry::default();
e.set_data(rctx.to_bytes().into());
m.mut_entries().push(e);
m.set_from(2);

let mut raft_msg = kvproto::raft_serverpb::RaftMessage::default();
raft_msg.set_region_id(region.get_id());
raft_msg.set_from_peer(follower);
raft_msg.set_to_peer(leader);
raft_msg.set_region_epoch(region.get_region_epoch().clone());
raft_msg.set_message(m);
cluster.send_raft_msg(raft_msg).unwrap();

(ReadIndexResponse::default(), start_ts)
};

// let (k, v) = (b"k1".to_vec(), b"k2".to_vec());
// let mut mutation = Mutation::default();
// mutation.set_op(Op::Put);
// mutation.set_key(k.clone());
// mutation.set_value(v);
// must_kv_prewrite(&client, ctx.clone(), vec![mutation], k.clone(), 10);

// let block_duration = Duration::from_millis(300);
// let client_clone = client.clone();
// let ctx_clone = ctx.clone();
// let k_clone = k.clone();
// let handle = std::thread::spawn(move || {
// std::thread::sleep(block_duration);
// info!("!!!!!! ZZZZ must commit");
// must_kv_commit(&client_clone, ctx_clone, vec![k_clone], 10, 30, 100);
// });
let cli = {
let env = Arc::new(Environment::new(1));
let channel = ChannelBuilder::new(env).connect(&addr);
TikvClient::new(channel)
};

must_kv_prewrite_with(
&cli,
ctx.clone(),
vec![new_mutation(Op::Put, &b"key2"[..], &b"value1"[..])],
vec![],
b"key2".to_vec(),
10,
0,
true,
false,
);

let (resp, start_ts) = read_index(&[(b"a", b"z")], 100);

std::thread::sleep(std::time::Duration::from_millis(10000));

// must_kv_commit(&cli, ctx.clone(), vec![b"key2".to_vec()], 10, 30, 100);
must_kv_read_equal(&cli, ctx.clone(), b"key2".to_vec(), b"value1".to_vec(), 100);
// handle.join().unwrap();
}
2 changes: 2 additions & 0 deletions src/server/raftkv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,11 +801,13 @@ impl ReadIndexObserver for ReplicaReadLockChecker {
return;
}
assert_eq!(msg.get_entries().len(), 1);
info!("!!!!!! ZZZZ update max_ts");
let mut rctx = ReadIndexContext::parse(msg.get_entries()[0].get_data()).unwrap();
if let Some(mut request) = rctx.request.take() {
let begin_instant = Instant::now();

let start_ts = request.get_start_ts().into();
info!("!!!!!! ZZZZ update max_ts to {}", start_ts);
self.concurrency_manager.update_max_ts(start_ts);
for range in request.mut_key_ranges().iter_mut() {
let key_bound = |key: Vec<u8>| {
Expand Down
1 change: 1 addition & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1543,6 +1543,7 @@ impl<E: Engine, L: LockManager, F: KvFormat> Storage<E, L, F> {

match &cmd {
Command::Prewrite(Prewrite { mutations, .. }) => {
info!("!!!!!! prewrite {:?}", mutations);
let keys = mutations.iter().map(|m| m.key().as_encoded());
Self::check_api_version(
self.api_version,
Expand Down

0 comments on commit 15712b8

Please sign in to comment.