Skip to content

Commit

Permalink
close connection if max heartbeat miss reach
Browse files Browse the repository at this point in the history
  • Loading branch information
gftea committed Apr 18, 2024
1 parent f40ecfe commit b9da579
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion amqprs/src/net/reader_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use super::{

/////////////////////////////////////////////////////////////////////////////

const MAX_HEARTBEAT_MISS: u64 = 3;

pub(crate) struct ReaderHandler {
stream: BufIoReader,

Expand Down Expand Up @@ -211,6 +213,7 @@ impl ReaderHandler {
let max_interval: u64 = heartbeat.into();
let mut expiration = time::Instant::now() + time::Duration::from_secs(max_interval);
let mut is_network_failure = false;
let mut heartbeat_miss = 0;
loop {
tokio::select! {
biased;
Expand Down Expand Up @@ -285,10 +288,15 @@ impl ReaderHandler {
if expiration <= time::Instant::now() {
expiration = time::Instant::now() + time::Duration::from_secs(max_interval);

// TODO: what to do with missing heartbeat?
// should call self.io_failure_notify.notify_one();?
#[cfg(feature="traces")]
error!("missing heartbeat from server for {}", self.amqp_connection);
heartbeat_miss += 1;
if heartbeat_miss >= MAX_HEARTBEAT_MISS {
// Shutdown connection due to heartbeat timeout
is_network_failure = true;
break;
}
}
}
else => {
Expand Down

0 comments on commit b9da579

Please sign in to comment.