From 15712b87b64e51df93208f202a51ec0657a29590 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 19 Apr 2024 12:44:15 +0800 Subject: [PATCH] add test for async commit Signed-off-by: CalvinNeo --- proxy_tests/proxy/shared/replica_read.rs | 107 +++++++++++++++++++++++ src/server/raftkv/mod.rs | 2 + src/storage/mod.rs | 1 + 3 files changed, 110 insertions(+) diff --git a/proxy_tests/proxy/shared/replica_read.rs b/proxy_tests/proxy/shared/replica_read.rs index 803acffc8da..0ef3c00e153 100644 --- a/proxy_tests/proxy/shared/replica_read.rs +++ b/proxy_tests/proxy/shared/replica_read.rs @@ -458,6 +458,7 @@ fn test_raft_cmd_request_cant_advanve_max_ts() { fail::remove("on_pre_write_apply_state") } +// https://github.com/tikv/tikv/pull/8669/files #[test] fn test_raft_cmd_request_learner_advanve_max_ts() { use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse}; @@ -639,3 +640,109 @@ fn test_raft_message_can_advanve_max_ts() { cluster.shutdown(); fail::remove("on_pre_write_apply_state") } + +#[test] +fn test_concurrent_update_maxts_and_commit() { + use kvproto::{ + kvrpcpb::{Mutation, Op}, + raft_cmdpb::{ReadIndexRequest, ReadIndexResponse}, + }; + use test_raftstore::{ + must_kv_commit, must_kv_prewrite, must_kv_prewrite_with, must_kv_read_equal, new_mutation, + }; + let mut cluster = new_server_cluster(0, 1); + cluster.run(); + + let cm = cluster.sim.read().unwrap().get_concurrency_manager(1); + + let region = cluster.get_region(b""); + let leader = region.get_peers()[0].clone(); + let follower = new_learner_peer(2, 2); + let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned(); + + let env = Arc::new(Environment::new(1)); + let channel = ChannelBuilder::new(env).connect(&addr); + let client = TikvClient::new(channel); + + let region_id = leader.get_id(); + + let mut ctx = Context::default(); + ctx.set_region_id(region_id); + ctx.set_peer(leader.clone()); + ctx.set_region_epoch(region.get_region_epoch().clone()); + + let read_index = |ranges: &[(&[u8], &[u8])], start_ts: u64| { + let mut m = raft::eraftpb::Message::default(); + m.set_msg_type(MessageType::MsgReadIndex); + let mut read_index_req = ReadIndexRequest::default(); + read_index_req.set_start_ts(start_ts); + for &(start_key, end_key) in ranges { + let mut range = KeyRange::default(); + range.set_start_key(start_key.to_vec()); + range.set_end_key(end_key.to_vec()); + read_index_req.mut_key_ranges().push(range); + } + + let rctx = ReadIndexContext { + id: Uuid::new_v4(), + request: Some(read_index_req), + locked: None, + }; + let mut e = raft::eraftpb::Entry::default(); + e.set_data(rctx.to_bytes().into()); + m.mut_entries().push(e); + m.set_from(2); + + let mut raft_msg = kvproto::raft_serverpb::RaftMessage::default(); + raft_msg.set_region_id(region.get_id()); + raft_msg.set_from_peer(follower); + raft_msg.set_to_peer(leader); + raft_msg.set_region_epoch(region.get_region_epoch().clone()); + raft_msg.set_message(m); + cluster.send_raft_msg(raft_msg).unwrap(); + + (ReadIndexResponse::default(), start_ts) + }; + + // let (k, v) = (b"k1".to_vec(), b"k2".to_vec()); + // let mut mutation = Mutation::default(); + // mutation.set_op(Op::Put); + // mutation.set_key(k.clone()); + // mutation.set_value(v); + // must_kv_prewrite(&client, ctx.clone(), vec![mutation], k.clone(), 10); + + // let block_duration = Duration::from_millis(300); + // let client_clone = client.clone(); + // let ctx_clone = ctx.clone(); + // let k_clone = k.clone(); + // let handle = std::thread::spawn(move || { + // std::thread::sleep(block_duration); + // info!("!!!!!! ZZZZ must commit"); + // must_kv_commit(&client_clone, ctx_clone, vec![k_clone], 10, 30, 100); + // }); + let cli = { + let env = Arc::new(Environment::new(1)); + let channel = ChannelBuilder::new(env).connect(&addr); + TikvClient::new(channel) + }; + + must_kv_prewrite_with( + &cli, + ctx.clone(), + vec![new_mutation(Op::Put, &b"key2"[..], &b"value1"[..])], + vec![], + b"key2".to_vec(), + 10, + 0, + true, + false, + ); + + let (resp, start_ts) = read_index(&[(b"a", b"z")], 100); + + std::thread::sleep(std::time::Duration::from_millis(10000)); + + // must_kv_commit(&cli, ctx.clone(), vec![b"key2".to_vec()], 10, 30, 100); + must_kv_read_equal(&cli, ctx.clone(), b"key2".to_vec(), b"value1".to_vec(), 100); + // handle.join().unwrap(); +} diff --git a/src/server/raftkv/mod.rs b/src/server/raftkv/mod.rs index 9f42925b6d4..fd1291c493c 100644 --- a/src/server/raftkv/mod.rs +++ b/src/server/raftkv/mod.rs @@ -801,11 +801,13 @@ impl ReadIndexObserver for ReplicaReadLockChecker { return; } assert_eq!(msg.get_entries().len(), 1); + info!("!!!!!! ZZZZ update max_ts"); let mut rctx = ReadIndexContext::parse(msg.get_entries()[0].get_data()).unwrap(); if let Some(mut request) = rctx.request.take() { let begin_instant = Instant::now(); let start_ts = request.get_start_ts().into(); + info!("!!!!!! ZZZZ update max_ts to {}", start_ts); self.concurrency_manager.update_max_ts(start_ts); for range in request.mut_key_ranges().iter_mut() { let key_bound = |key: Vec| { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 13d868849f4..b1d3910f49e 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1543,6 +1543,7 @@ impl Storage { match &cmd { Command::Prewrite(Prewrite { mutations, .. }) => { + info!("!!!!!! prewrite {:?}", mutations); let keys = mutations.iter().map(|m| m.key().as_encoded()); Self::check_api_version( self.api_version,