From 20addac9e92e9855a6c38b88e045ededcd826241 Mon Sep 17 00:00:00 2001 From: Alfred Hodler Date: Sun, 12 Mar 2023 09:54:14 +0000 Subject: [PATCH] Improved connection logic - `ConnectedFrom` and `ConnectedTo` are now emitted only once the peer stream is fully writeable. --- Cargo.toml | 2 +- src/message_stream.rs | 3 + src/reactor.rs | 130 +++++++++++++++++++++++++++++++-------- tests/connect_timeout.rs | 32 ++++++++++ 4 files changed, 141 insertions(+), 26 deletions(-) create mode 100644 tests/connect_timeout.rs diff --git a/Cargo.toml b/Cargo.toml index f7d7935..6bf500a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "peerlink" -version = "0.6.0" +version = "0.7.0" edition = "2021" authors = ["Alfred Hodler "] license = "MIT" diff --git a/src/message_stream.rs b/src/message_stream.rs index 2feef6b..c879cf5 100644 --- a/src/message_stream.rs +++ b/src/message_stream.rs @@ -20,6 +20,8 @@ pub struct StreamConfig { pub tx_buf_limits: std::ops::Range, /// The duration after which a peer is disconnected if it fails to read incoming data. pub stream_write_timeout: std::time::Duration, + /// The duration after which a connection attempt is abandoned. + pub stream_connect_timeout: std::time::Duration, } impl Default for StreamConfig { @@ -28,6 +30,7 @@ impl Default for StreamConfig { rx_buf_min_size: 128 * 1024, tx_buf_limits: (128 * 1024)..message::MAX_MSG_SIZE, stream_write_timeout: std::time::Duration::from_secs(30), + stream_connect_timeout: std::time::Duration::from_secs(5), } } } diff --git a/src/reactor.rs b/src/reactor.rs index e7e2d46..a351ea2 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -223,10 +223,33 @@ impl Handle { } } -/// Contains a stream along with its peer id. +/// The direction of a peer connection. +enum Direction { + Inbound { interface: SocketAddr }, + Outbound, +} + +impl Direction { + fn is_outbound(&self) -> bool { + match self { + Direction::Inbound { .. } => false, + Direction::Outbound => true, + } + } +} + +enum ConnectState { + InProgress { start: Instant }, + Connected, +} + +/// Contains a stream along with metadata. struct Entry { stream: MessageStream, peer_id: PeerId, + direction: Direction, + connect_state: ConnectState, + addr: SocketAddr, } /// Runs the reactor in a loop until an error is produced or a shutdown command is received. @@ -266,7 +289,7 @@ fn run( let mut remove_stale: Vec = Vec::with_capacity(16); loop { - poll.poll(&mut events, Some(Duration::from_secs(5)))?; + poll.poll(&mut events, Some(Duration::from_secs(1)))?; let has_slot = has_slot(listeners.len(), streams.vacant_key()); let now = Instant::now(); @@ -289,6 +312,8 @@ fn run( &mut streams, &mut token_map, &mut next_peer_id, + addr, + Direction::Outbound, stream, config.stream_config.clone(), )?; @@ -306,7 +331,9 @@ fn run( )) }; - let _ = sender.send(Event::ConnectedTo { addr, result }); + if result.is_err() { + let _ = sender.send(Event::ConnectedTo { addr, result }); + } } Command::Disconnect(peer) => { @@ -393,16 +420,12 @@ fn run( &mut streams, &mut token_map, &mut next_peer_id, + addr, + Direction::Inbound { interface }, stream, config.stream_config.clone(), )?; log::info!("peer {peer}: accepted connection from {addr}"); - - let _ = sender.send(Event::ConnectedFrom { - peer, - addr, - interface, - }); } Err(err) if would_block(&err) => break, Err(err) => log::warn!("accept error: {}", err), @@ -413,9 +436,38 @@ fn run( (token, Some(entry)) => { let peer = entry.peer_id; - if !entry.stream.is_ready() { - log::trace!("peer: {peer}: stream not ready"); - continue; + match entry.connect_state { + ConnectState::InProgress { .. } => { + if !entry.stream.is_ready() { + log::trace!("peer: {peer}: stream not ready"); + continue; + } else { + entry.connect_state = ConnectState::Connected; + + let event = match entry.direction { + Direction::Inbound { interface } => Event::ConnectedFrom { + peer, + addr: entry.addr, + interface, + }, + Direction::Outbound => Event::ConnectedTo { + addr: entry.addr, + result: Ok(peer), + }, + }; + + poll.registry().reregister( + entry.stream.inner_mut(), + token, + Interest::READABLE, + )?; + + let _ = sender.send(event); + } + + continue; + } + ConnectState::Connected => {} } if event.is_readable() { @@ -540,22 +592,42 @@ fn run( } } - // stale stream removal - remove_stale.extend( - streams - .iter() - .filter_map(|(_, entry)| entry.stream.is_write_stale(now).then_some(entry.peer_id)), - ); + // dead stream removal + let must_remove = streams + .iter() + .filter_map(|(_, entry)| match entry.connect_state { + ConnectState::InProgress { start } + if (now - start) > config.stream_config.stream_connect_timeout => + { + if entry.direction.is_outbound() { + let _ = sender.send(Event::ConnectedTo { + addr: entry.addr, + result: Err(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "Connect attempt timed out", + )), + }); + } + + Some(entry.peer_id) + } + ConnectState::Connected if entry.stream.is_write_stale(now) => { + let _ = sender.send(Event::Disconnected { + peer: entry.peer_id, + reason: DisconnectReason::WriteStale, + }); + + Some(entry.peer_id) + } + _ => None, + }); + + remove_stale.extend(must_remove); for peer in remove_stale.drain(..) { - log::info!("removing stale peer {peer}"); + log::info!("removing dead peer {peer}"); remove_stream(poll.registry(), &mut streams, &mut token_map, peer)?; - - let _ = sender.send(Event::Disconnected { - peer, - reason: DisconnectReason::WriteStale, - }); } // periodic buffer resize @@ -660,18 +732,21 @@ fn write(stream: &mut MessageStream, now: Instant) -> io::Result<()> } /// Registers a peer with the poll and adds him to the stream list. +#[allow(clippy::too_many_arguments)] fn add_stream( registry: &Registry, streams: &mut Slab, token_map: &mut IntMap, next_peer_id: &mut u64, + addr: SocketAddr, + direction: Direction, mut stream: TcpStream, stream_cfg: message_stream::StreamConfig, ) -> std::io::Result { let token = Token(streams.vacant_key()); let peer_id = *next_peer_id; - registry.register(&mut stream, token, Interest::READABLE)?; + registry.register(&mut stream, token, Interest::WRITABLE)?; let prev_mapping = token_map.insert(peer_id, token); assert!(prev_mapping.is_none()); @@ -679,6 +754,11 @@ fn add_stream( streams.insert(Entry { stream: MessageStream::new(stream, stream_cfg), peer_id: PeerId(peer_id), + direction, + connect_state: ConnectState::InProgress { + start: Instant::now(), + }, + addr, }); *next_peer_id += 1; diff --git a/tests/connect_timeout.rs b/tests/connect_timeout.rs new file mode 100644 index 0000000..dec605d --- /dev/null +++ b/tests/connect_timeout.rs @@ -0,0 +1,32 @@ +use std::io::ErrorKind; +use std::net::{Ipv4Addr, SocketAddrV4}; +use std::time::Duration; + +use peerlink::{Config, Event, Reactor, StreamConfig}; + +/// Connects a client to a nonexistent peer and waits for the timeout. +#[test] +fn client_connects_to_nonexistent() { + let _ = env_logger::builder().is_test(true).try_init(); + + let (client_reactor, client_handle) = Reactor::new(Config { + stream_config: StreamConfig { + stream_connect_timeout: Duration::from_secs(1), + ..Default::default() + }, + ..Default::default() + }) + .unwrap(); + + let _ = client_reactor.run(); + + let server_addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, u16::MAX); + + let _ = client_handle.send(peerlink::Command::Connect(server_addr.into())); + + let connected = client_handle.receive().unwrap(); + assert!(matches!( + connected, + Event::ConnectedTo { result: Err(err), .. } if err.kind() == ErrorKind::TimedOut + )); +}