Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ocassional command timeouts on cluster connection. #28

Closed
mstyura opened this issue May 16, 2023 · 1 comment · Fixed by #30
Closed

Ocassional command timeouts on cluster connection. #28

mstyura opened this issue May 16, 2023 · 1 comment · Fixed by #30

Comments

@mstyura
Copy link
Contributor

mstyura commented May 16, 2023

Problem description

The client application, which uses rustis to connect to a Redis cluster, sometimes doesn't receive a response to a command, or the command execution timeout is triggered.

Steps to reproduce

  1. Use rustis to connect to redis cluster;
  2. Send finite set of commands to redis cluster;

Actual result

Last commands does not receive response or times out

Expected result

All commands run to completion without timeout

More details

The problem feels like some Futute is not properly polled, and computation proceeds as long as new commands sent to redis.

I've did some debugging and seems like I found out the problem, but can't figure out the easy fix for a problem.
I believe there is a problems with this network loop:

loop {
select! {
msg = self.msg_receiver.next().fuse() => {
if !self.handle_message(msg).await { break; }
} ,
value = self.connection.read().fuse() => {
self.handle_result(value).await;
}
}
}

According to tokio tutorial select drops the futures which are not "selected" (completed). The future returned by self.msg_receiver.next() is safe to drop become it does not incur any computations (afaik), while the future returned by self.connection.read() is not, in case of ClusterConnection:
pub async fn read(&mut self) -> Option<Result<RespBuf>> {
let mut request_info: RequestInfo;
loop {
let read_futures = self.nodes.iter_mut().map(|n| n.connection.read().boxed());
let (result, node_idx, _) = future::select_all(read_futures).await;
if let Some(Ok(bytes)) = &result {
if bytes.is_push_message() {
return result;
}
}
let node_id = &self.nodes[node_idx].id;
if let Some(sub_request) = self.pending_requests.iter_mut().find_map(|r| {
r.sub_requests
.iter_mut()
.find(|sr| sr.node_id == *node_id && sr.result.is_none())
}) {
sub_request.result = Some(result);
} else {
return Some(Err(Error::Client("Received unexpected message".to_owned())));
};
if let Some(ri) = self.pending_requests.front() {
trace!("request_info: {ri:?}");
if ri.sub_requests.iter().all(|sr| sr.result.is_some()) {
if let Some(ri) = self.pending_requests.pop_front() {
request_info = ri;
break;
}
}
}
}
let mut sub_results =
Vec::<Result<RespBuf>>::with_capacity(request_info.sub_requests.len());
let mut retry_reasons = SmallVec::<[RetryReason; 1]>::new();
for sub_request in request_info.sub_requests.iter_mut() {
let result = sub_request.result.take()?;
if let Some(result) = result {
match &result {
Ok(resp_buf) if resp_buf.is_error() => match resp_buf.to::<()>() {
Err(Error::Redis(RedisError {
kind: RedisErrorKind::Ask { hash_slot, address },
description: _,
})) => retry_reasons.push(RetryReason::Ask {
hash_slot,
address: address.clone(),
}),
Err(Error::Redis(RedisError {
kind: RedisErrorKind::Moved { hash_slot, address },
description: _,
})) => retry_reasons.push(RetryReason::Moved {
hash_slot,
address: address.clone(),
}),
_ => sub_results.push(result),
},
_ => sub_results.push(result),
}
} else {
return None;
}
}
if !retry_reasons.is_empty() {
debug!(
"read failed and will be retried. reasons: {:?}",
retry_reasons
);
return Some(Err(Error::Retry(retry_reasons)));
}
let command_name = &request_info.command_name;
let command_info = self
.command_info_manager
.get_command_info_by_name(command_name);
let command_info = if let Some(command_info) = command_info {
command_info
} else {
return Some(Err(Error::Client(format!(
"Unknown command {}",
command_name
))));
};
let response_policy = command_info.command_tips.iter().find_map(|tip| {
if let CommandTip::ResponsePolicy(response_policy) = tip {
Some(response_policy)
} else {
None
}
});
// The response_policy tip is set for commands that reply with scalar data types,
// or when it's expected that clients implement a non-default aggregate.
if let Some(response_policy) = response_policy {
match response_policy {
ResponsePolicy::OneSucceeded => {
self.response_policy_one_succeeded(sub_results).await
}
ResponsePolicy::AllSucceeded => {
self.response_policy_all_succeeded(sub_results).await
}
ResponsePolicy::AggLogicalAnd => {
self.response_policy_agg(sub_results, |a, b| i64::from(a == 1 && b == 1))
}
ResponsePolicy::AggLogicalOr => {
self.response_policy_agg(
sub_results,
|a, b| if a == 0 && b == 0 { 0 } else { 1 },
)
}
ResponsePolicy::AggMin => self.response_policy_agg(sub_results, i64::min),
ResponsePolicy::AggMax => self.response_policy_agg(sub_results, i64::max),
ResponsePolicy::AggSum => self.response_policy_agg(sub_results, |a, b| a + b),
ResponsePolicy::Special => self.response_policy_special(sub_results).await,
}
} else {
self.no_response_policy(sub_results, &request_info).await
}
}
there is non-trivial future which is composition of several other futures. When network loop abandon the future produced by ClusterConnection::read it might be interrupted at the middle of execution, somewhere between
let (result, node_idx, _) = future::select_all(read_futures).await;
and one of the returns, and between first await and exits from methods there are multiple suspension points (awaits).
So on the next iteration of NetworkHandler::network_loop the select! will create two new futures for MsgReceiver::next and ClusterConnection::read, so the new future produced by ClusterConnection::read will first wait for new bytes from socket with redis, while it should "resume" with handling of response from previous future which is cancelled.

As a workaround I've tried locally to move

if let Some(ri) = self.pending_requests.front() {
trace!("request_info: {ri:?}");
if ri.sub_requests.iter().all(|sr| sr.result.is_some()) {
if let Some(ri) = self.pending_requests.pop_front() {
request_info = ri;
break;
}
}
}
to the beginning of the loop inside ClusterConnection::read and it seems to "make" ClusterConnection::read behave like "resumable" operation. But I'm not sure the change I've made correct at all, even so it seems to fix the app I've tested with.

The open question is what to do next, I see that either:

  1. Network loop should not drop future which is not yet completed and use same future across iteration until it completed (but quick experiment shown that it might be not trivial to make it work with borrow checker happy). On a high level Connection seems like kind of Stream and it should be combined with MsgReceiver using select to produce combined stream which is later could be handled with while loop with match inside;
  2. Connection::read on all types of connections must be cancel safe, so new calls to read must "resume" previously cancelled read (the question here is how to make this robust to further changes, i.e. how to prevent cancel-safe future to become unsafe again without notice).
@mstyura mstyura changed the title Accidental command timeouts on cluster connection. Ocassional command timeouts on cluster connection. May 16, 2023
mstyura added a commit to mstyura/rustis that referenced this issue May 18, 2023
@mstyura
Copy link
Contributor Author

mstyura commented May 18, 2023

seems like automatically closed, even so not fixed by mr referenced.

mstyura added a commit to mstyura/rustis that referenced this issue May 18, 2023
mcatanzariti pushed a commit that referenced this issue May 18, 2023
Co-authored-by: Yury Yarashevich <yura.yaroshevich@gmail.com>
mstyura added a commit to mstyura/rustis that referenced this issue May 18, 2023
mstyura added a commit to mstyura/rustis that referenced this issue May 18, 2023
mstyura added a commit to mstyura/rustis that referenced this issue May 19, 2023
mstyura added a commit to mstyura/rustis that referenced this issue May 19, 2023
mstyura added a commit to mstyura/rustis that referenced this issue May 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant