Skip to content

Commit

Permalink
Improved connection logic
Browse files Browse the repository at this point in the history
- `ConnectedFrom` and `ConnectedTo` are now emitted only once the peer stream
  is fully writeable.
  • Loading branch information
alfred-hodler committed Mar 12, 2023
1 parent 249121d commit 20addac
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 26 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "peerlink"
version = "0.6.0"
version = "0.7.0"
edition = "2021"
authors = ["Alfred Hodler <alfred_hodler@protonmail.com>"]
license = "MIT"
Expand Down
3 changes: 3 additions & 0 deletions src/message_stream.rs
Expand Up @@ -20,6 +20,8 @@ pub struct StreamConfig {
pub tx_buf_limits: std::ops::Range<usize>,
/// The duration after which a peer is disconnected if it fails to read incoming data.
pub stream_write_timeout: std::time::Duration,
/// The duration after which a connection attempt is abandoned.
pub stream_connect_timeout: std::time::Duration,
}

impl Default for StreamConfig {
Expand All @@ -28,6 +30,7 @@ impl Default for StreamConfig {
rx_buf_min_size: 128 * 1024,
tx_buf_limits: (128 * 1024)..message::MAX_MSG_SIZE,
stream_write_timeout: std::time::Duration::from_secs(30),
stream_connect_timeout: std::time::Duration::from_secs(5),
}
}
}
Expand Down
130 changes: 105 additions & 25 deletions src/reactor.rs
Expand Up @@ -223,10 +223,33 @@ impl Handle {
}
}

/// Contains a stream along with its peer id.
/// The direction of a peer connection.
enum Direction {
Inbound { interface: SocketAddr },
Outbound,
}

impl Direction {
fn is_outbound(&self) -> bool {
match self {
Direction::Inbound { .. } => false,
Direction::Outbound => true,
}
}
}

enum ConnectState {
InProgress { start: Instant },
Connected,
}

/// Contains a stream along with metadata.
struct Entry {
stream: MessageStream<TcpStream>,
peer_id: PeerId,
direction: Direction,
connect_state: ConnectState,
addr: SocketAddr,
}

/// Runs the reactor in a loop until an error is produced or a shutdown command is received.
Expand Down Expand Up @@ -266,7 +289,7 @@ fn run<C: Connector + Sync + Send + 'static>(
let mut remove_stale: Vec<PeerId> = Vec::with_capacity(16);

loop {
poll.poll(&mut events, Some(Duration::from_secs(5)))?;
poll.poll(&mut events, Some(Duration::from_secs(1)))?;

let has_slot = has_slot(listeners.len(), streams.vacant_key());
let now = Instant::now();
Expand All @@ -289,6 +312,8 @@ fn run<C: Connector + Sync + Send + 'static>(
&mut streams,
&mut token_map,
&mut next_peer_id,
addr,
Direction::Outbound,
stream,
config.stream_config.clone(),
)?;
Expand All @@ -306,7 +331,9 @@ fn run<C: Connector + Sync + Send + 'static>(
))
};

let _ = sender.send(Event::ConnectedTo { addr, result });
if result.is_err() {
let _ = sender.send(Event::ConnectedTo { addr, result });
}
}

Command::Disconnect(peer) => {
Expand Down Expand Up @@ -393,16 +420,12 @@ fn run<C: Connector + Sync + Send + 'static>(
&mut streams,
&mut token_map,
&mut next_peer_id,
addr,
Direction::Inbound { interface },
stream,
config.stream_config.clone(),
)?;
log::info!("peer {peer}: accepted connection from {addr}");

let _ = sender.send(Event::ConnectedFrom {
peer,
addr,
interface,
});
}
Err(err) if would_block(&err) => break,
Err(err) => log::warn!("accept error: {}", err),
Expand All @@ -413,9 +436,38 @@ fn run<C: Connector + Sync + Send + 'static>(
(token, Some(entry)) => {
let peer = entry.peer_id;

if !entry.stream.is_ready() {
log::trace!("peer: {peer}: stream not ready");
continue;
match entry.connect_state {
ConnectState::InProgress { .. } => {
if !entry.stream.is_ready() {
log::trace!("peer: {peer}: stream not ready");
continue;
} else {
entry.connect_state = ConnectState::Connected;

let event = match entry.direction {
Direction::Inbound { interface } => Event::ConnectedFrom {
peer,
addr: entry.addr,
interface,
},
Direction::Outbound => Event::ConnectedTo {
addr: entry.addr,
result: Ok(peer),
},
};

poll.registry().reregister(
entry.stream.inner_mut(),
token,
Interest::READABLE,
)?;

let _ = sender.send(event);
}

continue;
}
ConnectState::Connected => {}
}

if event.is_readable() {
Expand Down Expand Up @@ -540,22 +592,42 @@ fn run<C: Connector + Sync + Send + 'static>(
}
}

// stale stream removal
remove_stale.extend(
streams
.iter()
.filter_map(|(_, entry)| entry.stream.is_write_stale(now).then_some(entry.peer_id)),
);
// dead stream removal
let must_remove = streams
.iter()
.filter_map(|(_, entry)| match entry.connect_state {
ConnectState::InProgress { start }
if (now - start) > config.stream_config.stream_connect_timeout =>
{
if entry.direction.is_outbound() {
let _ = sender.send(Event::ConnectedTo {
addr: entry.addr,
result: Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"Connect attempt timed out",
)),
});
}

Some(entry.peer_id)
}
ConnectState::Connected if entry.stream.is_write_stale(now) => {
let _ = sender.send(Event::Disconnected {
peer: entry.peer_id,
reason: DisconnectReason::WriteStale,
});

Some(entry.peer_id)
}
_ => None,
});

remove_stale.extend(must_remove);

for peer in remove_stale.drain(..) {
log::info!("removing stale peer {peer}");
log::info!("removing dead peer {peer}");

remove_stream(poll.registry(), &mut streams, &mut token_map, peer)?;

let _ = sender.send(Event::Disconnected {
peer,
reason: DisconnectReason::WriteStale,
});
}

// periodic buffer resize
Expand Down Expand Up @@ -660,25 +732,33 @@ fn write(stream: &mut MessageStream<TcpStream>, now: Instant) -> io::Result<()>
}

/// Registers a peer with the poll and adds him to the stream list.
#[allow(clippy::too_many_arguments)]
fn add_stream(
registry: &Registry,
streams: &mut Slab<Entry>,
token_map: &mut IntMap<Token>,
next_peer_id: &mut u64,
addr: SocketAddr,
direction: Direction,
mut stream: TcpStream,
stream_cfg: message_stream::StreamConfig,
) -> std::io::Result<PeerId> {
let token = Token(streams.vacant_key());
let peer_id = *next_peer_id;

registry.register(&mut stream, token, Interest::READABLE)?;
registry.register(&mut stream, token, Interest::WRITABLE)?;

let prev_mapping = token_map.insert(peer_id, token);
assert!(prev_mapping.is_none());

streams.insert(Entry {
stream: MessageStream::new(stream, stream_cfg),
peer_id: PeerId(peer_id),
direction,
connect_state: ConnectState::InProgress {
start: Instant::now(),
},
addr,
});

*next_peer_id += 1;
Expand Down
32 changes: 32 additions & 0 deletions tests/connect_timeout.rs
@@ -0,0 +1,32 @@
use std::io::ErrorKind;
use std::net::{Ipv4Addr, SocketAddrV4};
use std::time::Duration;

use peerlink::{Config, Event, Reactor, StreamConfig};

/// Connects a client to a nonexistent peer and waits for the timeout.
#[test]
fn client_connects_to_nonexistent() {
let _ = env_logger::builder().is_test(true).try_init();

let (client_reactor, client_handle) = Reactor::new(Config {
stream_config: StreamConfig {
stream_connect_timeout: Duration::from_secs(1),
..Default::default()
},
..Default::default()
})
.unwrap();

let _ = client_reactor.run();

let server_addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, u16::MAX);

let _ = client_handle.send(peerlink::Command::Connect(server_addr.into()));

let connected = client_handle.receive().unwrap();
assert!(matches!(
connected,
Event::ConnectedTo { result: Err(err), .. } if err.kind() == ErrorKind::TimedOut
));
}

0 comments on commit 20addac

Please sign in to comment.