Skip to content

Commit

Permalink
kv_service: fix batch empty request deadlock (tikv#7535)
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay committed Apr 19, 2020
1 parent f5ee17b commit bd21a5b
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 9 deletions.
26 changes: 18 additions & 8 deletions src/server/service/kv.rs
Expand Up @@ -23,6 +23,7 @@ use crate::storage::{
};
use engine_rocks::RocksEngine;
use futures::executor::{self, Notify, Spawn};
use futures::future::Either;
use futures::{future, Async, Future, Sink, Stream};
use grpcio::{
ClientStreamingSink, DuplexSink, Error as GrpcError, RequestStream, RpcContext, RpcStatus,
Expand Down Expand Up @@ -1132,14 +1133,23 @@ fn handle_batch_commands_request<E: Engine, L: LockManager>(
fn future_handle_empty(
req: BatchCommandsEmptyRequest,
) -> impl Future<Item = BatchCommandsEmptyResponse, Error = Error> {
tikv_util::timer::GLOBAL_TIMER_HANDLE
.delay(std::time::Instant::now() + std::time::Duration::from_millis(req.get_delay_time()))
.map(move |_| {
let mut res = BatchCommandsEmptyResponse::default();
res.set_test_id(req.get_test_id());
res
})
.map_err(|_| unreachable!())
let mut res = BatchCommandsEmptyResponse::default();
res.set_test_id(req.get_test_id());
// `BatchCommandsNotify` processes futures in notify. If delay_time is too small, notify
// can be called immediately, so the future is polled recursively and lead to deadlock.
if req.get_delay_time() < 10 {
Either::A(future::result(Ok(res)))
} else {
Either::B(
tikv_util::timer::GLOBAL_TIMER_HANDLE
.delay(
std::time::Instant::now()
+ std::time::Duration::from_millis(req.get_delay_time()),
)
.map(move |_| res)
.map_err(|_| unreachable!()),
)
}
}

fn future_get<E: Engine, L: LockManager>(
Expand Down
1 change: 1 addition & 0 deletions src/server/service/mod.rs
Expand Up @@ -8,3 +8,4 @@ mod kv;
pub use self::debug::Service as DebugService;
pub use self::diagnostics::Service as DiagnosticsService;
pub use self::kv::Service as KvService;
pub use self::kv::{batch_commands_request, batch_commands_response};
50 changes: 49 additions & 1 deletion tests/integrations/server/kv_service.rs
Expand Up @@ -2,12 +2,13 @@

use futures::{Future, Sink, Stream};
use grpcio::*;
use kvproto::tikvpb::BatchCommandsRequest;
use kvproto::tikvpb::TikvClient;
use kvproto::tikvpb::*;
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::Duration;
use test_raftstore::new_server_cluster;
use tikv::server::service::batch_commands_request;
use tikv_util::HandyRwLock;

#[test]
Expand Down Expand Up @@ -52,3 +53,50 @@ fn test_batch_commands() {
});
rx.recv_timeout(Duration::from_secs(1)).unwrap();
}

#[test]
fn test_empty_commands() {
let mut cluster = new_server_cluster(0, 1);
cluster.run();

let leader = cluster.get_region(b"").get_peers()[0].clone();
let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned();

let env = Arc::new(Environment::new(1));
let channel = ChannelBuilder::new(env).connect(&addr);
let client = TikvClient::new(channel);

let (mut sender, receiver) = client.batch_commands().unwrap();
for _ in 0..1000 {
let mut batch_req = BatchCommandsRequest::default();
for i in 0..10 {
let mut req = batch_commands_request::Request::default();
req.cmd = Some(batch_commands_request::request::Cmd::Empty(
Default::default(),
));
batch_req.mut_requests().push(req);
batch_req.mut_request_ids().push(i);
}
match sender.send((batch_req, WriteFlags::default())).wait() {
Ok(s) => sender = s,
Err(e) => panic!("tikv client send fail: {:?}", e),
}
}

let (tx, rx) = mpsc::sync_channel(1);
thread::spawn(move || {
// We have send 10k requests to the server, so we should get 10k responses.
let mut count = 0;
for x in receiver
.wait()
.map(move |b| b.unwrap().get_responses().len())
{
count += x;
if count == 10000 {
tx.send(1).unwrap();
return;
}
}
});
rx.recv_timeout(Duration::from_secs(1)).unwrap();
}

0 comments on commit bd21a5b

Please sign in to comment.