Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
nakamoto/net/poll/src/reactor.rs
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
445 lines (375 sloc)
15.6 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! Poll-based reactor. This is a single-threaded reactor using a `poll` loop. | |
use bitcoin::consensus::encode; | |
use bitcoin::network::message::RawNetworkMessage; | |
use crossbeam_channel as chan; | |
use nakamoto_common::block::filter::Filters; | |
use nakamoto_common::block::time::{LocalDuration, LocalTime}; | |
use nakamoto_common::block::tree::BlockTree; | |
use nakamoto_common::p2p::peer; | |
use nakamoto_p2p; | |
use nakamoto_p2p::error::Error; | |
use nakamoto_p2p::event::Event; | |
use nakamoto_p2p::protocol::{self, Command, Input, Link, Out}; | |
use log::*; | |
use std::collections::{HashMap, HashSet, VecDeque}; | |
use std::fmt::Debug; | |
use std::io; | |
use std::io::prelude::*; | |
use std::net; | |
use std::os::unix::io::AsRawFd; | |
use std::sync::Arc; | |
use std::time; | |
use std::time::SystemTime; | |
use crate::fallible; | |
use crate::socket::Socket; | |
use crate::time::TimeoutManager; | |
/// Maximum time to wait when reading from a socket. | |
const READ_TIMEOUT: time::Duration = time::Duration::from_secs(6); | |
/// Maximum time to wait when writing to a socket. | |
const WRITE_TIMEOUT: time::Duration = time::Duration::from_secs(3); | |
/// Maximum amount of time to wait for i/o. | |
const WAIT_TIMEOUT: LocalDuration = LocalDuration::from_mins(60); | |
#[must_use] | |
#[derive(Debug, PartialEq, Eq)] | |
enum Control { | |
Continue, | |
Shutdown, | |
} | |
#[derive(Debug, PartialEq, Eq, Clone)] | |
enum Source { | |
Peer(net::SocketAddr), | |
Listener, | |
Waker, | |
} | |
/// A single-threaded non-blocking reactor. | |
pub struct Reactor<R: Write + Read> { | |
peers: HashMap<net::SocketAddr, Socket<R, RawNetworkMessage>>, | |
connecting: HashSet<net::SocketAddr>, | |
inputs: VecDeque<Input>, | |
subscriber: chan::Sender<Event>, | |
commands: chan::Receiver<Command>, | |
sources: popol::Sources<Source>, | |
waker: Arc<popol::Waker>, | |
timeouts: TimeoutManager<()>, | |
} | |
/// The `R` parameter represents the underlying stream type, eg. `net::TcpStream`. | |
impl<R: Write + Read + AsRawFd> Reactor<R> { | |
/// Register a peer with the reactor. | |
fn register_peer(&mut self, addr: net::SocketAddr, stream: R, link: Link) { | |
self.sources | |
.register(Source::Peer(addr), &stream, popol::interest::ALL); | |
self.peers.insert(addr, Socket::from(stream, addr, link)); | |
} | |
/// Unregister a peer from the reactor. | |
fn unregister_peer(&mut self, addr: net::SocketAddr) { | |
self.connecting.remove(&addr); | |
self.inputs.push_back(Input::Disconnected(addr)); | |
self.sources.unregister(&Source::Peer(addr)); | |
self.peers.remove(&addr); | |
} | |
} | |
impl nakamoto_p2p::reactor::Reactor for Reactor<net::TcpStream> { | |
type Waker = Arc<popol::Waker>; | |
/// Construct a new reactor, given a channel to send events on. | |
fn new( | |
subscriber: chan::Sender<Event>, | |
commands: chan::Receiver<Command>, | |
) -> Result<Self, io::Error> { | |
let peers = HashMap::new(); | |
let inputs: VecDeque<Input> = VecDeque::new(); | |
let mut sources = popol::Sources::new(); | |
let waker = Arc::new(popol::Waker::new(&mut sources, Source::Waker)?); | |
let timeouts = TimeoutManager::new(); | |
let connecting = HashSet::new(); | |
Ok(Self { | |
peers, | |
connecting, | |
sources, | |
inputs, | |
subscriber, | |
commands, | |
waker, | |
timeouts, | |
}) | |
} | |
/// Run the given protocol with the reactor. | |
fn run<T: BlockTree, F: Filters, P: peer::Store, C: Fn(Event)>( | |
&mut self, | |
builder: protocol::Builder<T, F, P>, | |
listen_addrs: &[net::SocketAddr], | |
callback: C, | |
) -> Result<(), Error> { | |
let listener = if listen_addrs.is_empty() { | |
None | |
} else { | |
let listener = self::listen(listen_addrs)?; | |
let local_addr = listener.local_addr()?; | |
self.sources | |
.register(Source::Listener, &listener, popol::interest::READ); | |
self.subscriber.send(Event::Listening(local_addr))?; | |
info!("Listening on {}", local_addr); | |
Some(listener) | |
}; | |
info!("Initializing protocol.."); | |
let (tx, rx) = chan::unbounded(); | |
let mut protocol = builder.build(tx); | |
let local_time = SystemTime::now().into(); | |
protocol.initialize(local_time); | |
if let Control::Shutdown = self.process(&rx, local_time, &callback)? { | |
return Ok(()); | |
} | |
// Drain input events in case some were added during the processing of outputs. | |
while let Some(event) = self.inputs.pop_front() { | |
protocol.step(event, local_time); | |
if let Control::Shutdown = self.process(&rx, local_time, &callback)? { | |
return Ok(()); | |
} | |
} | |
// I/O readiness events populated by `popol::Sources::wait_timeout`. | |
let mut events = popol::Events::new(); | |
// Timeouts populated by `TimeoutManager::wake`. | |
let mut timeouts = Vec::with_capacity(32); | |
loop { | |
trace!( | |
"Polling {} sources and {} timeouts..", | |
self.sources.len(), | |
self.timeouts.len() | |
); | |
let timeout = self.timeouts.next().unwrap_or(WAIT_TIMEOUT).into(); | |
let result = self.sources.wait_timeout(&mut events, timeout); // Blocking. | |
let local_time = SystemTime::now().into(); | |
match result { | |
Ok(()) => { | |
for (source, ev) in events.iter() { | |
match source { | |
Source::Peer(addr) => { | |
if ev.errored || ev.hangup { | |
// Let the subsequent read fail. | |
trace!("{}: Socket error triggered: {:?}", addr, ev); | |
} | |
if ev.invalid { | |
// File descriptor was closed and is invalid. | |
// Nb. This shouldn't happen. It means the source wasn't | |
// properly unregistered, or there is a duplicate source. | |
error!("{}: Socket is invalid, removing", addr); | |
self.sources.unregister(&Source::Peer(*addr)); | |
continue; | |
} | |
if ev.writable { | |
self.handle_writable(&addr, source)?; | |
} | |
if ev.readable { | |
self.handle_readable(&addr); | |
} | |
} | |
Source::Listener => loop { | |
if let Some(ref listener) = listener { | |
let (conn, addr) = match listener.accept() { | |
Ok((conn, addr)) => (conn, addr), | |
Err(e) if e.kind() == io::ErrorKind::WouldBlock => { | |
break; | |
} | |
Err(e) => { | |
error!("Accept error: {}", e.to_string()); | |
break; | |
} | |
}; | |
conn.set_nonblocking(true)?; | |
let local_addr = conn.local_addr()?; | |
let link = Link::Inbound; | |
self.inputs.push_back(Input::Connected { | |
addr, | |
local_addr, | |
link, | |
}); | |
self.register_peer(addr, conn, link); | |
} | |
}, | |
Source::Waker => { | |
for cmd in self.commands.try_iter() { | |
self.inputs.push_back(Input::Command(cmd)); | |
} | |
} | |
} | |
} | |
} | |
Err(err) if err.kind() == io::ErrorKind::TimedOut => { | |
self.timeouts.wake(local_time, &mut timeouts); | |
if !timeouts.is_empty() { | |
for _ in timeouts.drain(..) { | |
self.inputs.push_back(Input::Timeout); | |
} | |
} | |
} | |
Err(err) => return Err(err.into()), | |
} | |
while let Some(event) = self.inputs.pop_front() { | |
protocol.step(event, local_time); | |
if let Control::Shutdown = self.process(&rx, local_time, &callback)? { | |
return Ok(()); | |
} | |
} | |
} | |
} | |
/// Wake the waker. | |
fn wake(waker: &Arc<popol::Waker>) -> io::Result<()> { | |
waker.wake() | |
} | |
/// Return a new waker. | |
/// | |
/// Used to wake up the main event loop. | |
fn waker(&self) -> Arc<popol::Waker> { | |
self.waker.clone() | |
} | |
} | |
impl Reactor<net::TcpStream> { | |
/// Process protocol state machine outputs. | |
fn process<C: Fn(Event)>( | |
&mut self, | |
outputs: &chan::Receiver<Out>, | |
local_time: LocalTime, | |
callback: C, | |
) -> Result<Control, Error> { | |
// Note that there may be messages destined for a peer that has since been | |
// disconnected. | |
for out in outputs.try_iter() { | |
match out { | |
Out::Message(addr, msg) => { | |
if let Some(peer) = self.peers.get_mut(&addr) { | |
let src = self.sources.get_mut(&Source::Peer(addr)).unwrap(); | |
{ | |
let mut s = format!("{:?}", msg.payload); | |
if s.len() > 96 { | |
s.truncate(96); | |
s.push_str("..."); | |
} | |
debug!("{}: Sending: {}", addr, s); | |
} | |
peer.queue(msg); | |
if let Err(err) = peer.drain(&mut self.inputs, src) { | |
error!("{}: Write error: {}", addr, err.to_string()); | |
peer.disconnect().ok(); | |
self.unregister_peer(addr); | |
} | |
} | |
} | |
// TODO: Use connection timeout, or handle timeouts in connection manager. | |
Out::Connect(addr, _timeout) => { | |
debug!("Connecting to {}...", &addr); | |
match self::dial(&addr) { | |
Ok(stream) => { | |
trace!("{:#?}", stream); | |
self.register_peer(addr, stream, Link::Outbound); | |
self.connecting.insert(addr); | |
self.inputs.push_back(Input::Connecting { addr }); | |
} | |
Err(err) => { | |
self.inputs.push_back(Input::Timeout); | |
error!("{}: Connection error: {}", addr, err.to_string()); | |
} | |
} | |
} | |
Out::Disconnect(addr, reason) => { | |
if let Some(peer) = self.peers.get(&addr) { | |
debug!("{}: Disconnecting: {}", addr, reason); | |
// Shutdown the connection, ignoring any potential errors. | |
// If the socket was already disconnected, this will yield | |
// an error that is safe to ignore (`ENOTCONN`). The other | |
// possible errors relate to an invalid file descriptor. | |
peer.disconnect().ok(); | |
self.unregister_peer(addr); | |
} | |
} | |
Out::SetTimeout(timeout) => { | |
self.timeouts.register((), local_time + timeout); | |
} | |
Out::Event(event) => { | |
trace!("Event: {:?}", event); | |
callback(event.clone()); | |
self.subscriber.try_send(event).unwrap(); // FIXME | |
} | |
Out::Shutdown => { | |
info!("Shutdown received"); | |
return Ok(Control::Shutdown); | |
} | |
} | |
} | |
Ok(Control::Continue) | |
} | |
fn handle_readable(&mut self, addr: &net::SocketAddr) { | |
let socket = self.peers.get_mut(&addr).unwrap(); | |
trace!("{}: Socket is readable", addr); | |
// Nb. Normally, since `poll`, which `popol` is based on, is | |
// level-triggered, we would be notified again if there was | |
// still data to be read on the socket. However, since our | |
// socket abstraction actually returns *decoded messages*, this | |
// doesn't apply. Thus, we have to loop to not miss messages. | |
loop { | |
match socket.read() { | |
Ok(msg) => { | |
self.inputs.push_back(Input::Received(*addr, msg)); | |
} | |
Err(encode::Error::Io(err)) if err.kind() == io::ErrorKind::WouldBlock => { | |
break; | |
} | |
Err(err) => { | |
match err { | |
encode::Error::Io(err) if err.kind() == io::ErrorKind::UnexpectedEof => { | |
debug!("{}: Remote peer closed the connection", addr) | |
} | |
_ => error!("{}: Read error: {}", addr, err.to_string()), | |
} | |
socket.disconnect().ok(); | |
self.unregister_peer(*addr); | |
break; | |
} | |
} | |
} | |
} | |
fn handle_writable(&mut self, addr: &net::SocketAddr, source: &Source) -> io::Result<()> { | |
trace!("{}: Socket is writable", addr); | |
let src = self.sources.get_mut(source).unwrap(); | |
let socket = self.peers.get_mut(&addr).unwrap(); | |
if self.connecting.remove(addr) { | |
let local_addr = socket.local_address()?; | |
self.inputs.push_back(Input::Connected { | |
addr: socket.address, | |
local_addr, | |
link: socket.link, | |
}); | |
} | |
if let Err(err) = socket.drain(&mut self.inputs, src) { | |
error!("{}: Write error: {}", addr, err.to_string()); | |
socket.disconnect().ok(); | |
self.unregister_peer(*addr); | |
} | |
Ok(()) | |
} | |
} | |
/// Connect to a peer given a remote address. | |
fn dial(addr: &net::SocketAddr) -> Result<net::TcpStream, Error> { | |
use socket2::{Domain, Socket, Type}; | |
fallible! { Error::Io(io::ErrorKind::Other.into()) }; | |
let domain = if addr.is_ipv4() { | |
Domain::ipv4() | |
} else { | |
Domain::ipv6() | |
}; | |
let sock = Socket::new(domain, Type::stream(), None)?; | |
sock.set_read_timeout(Some(READ_TIMEOUT))?; | |
sock.set_write_timeout(Some(WRITE_TIMEOUT))?; | |
sock.set_nonblocking(true)?; | |
match sock.connect(&(*addr).into()) { | |
Ok(()) => {} | |
Err(e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {} | |
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {} | |
Err(e) => return Err(e.into()), | |
} | |
Ok(sock.into_tcp_stream()) | |
} | |
// Listen for connections on the given address. | |
fn listen<A: net::ToSocketAddrs>(addr: A) -> Result<net::TcpListener, Error> { | |
let sock = net::TcpListener::bind(addr)?; | |
sock.set_nonblocking(true)?; | |
Ok(sock) | |
} |