Skip to content

Commit

Permalink
update comments throughout connection.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
yaahc committed Feb 19, 2021
1 parent 2adee7b commit 651d352
Showing 1 changed file with 47 additions and 25 deletions.
72 changes: 47 additions & 25 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@
//!
//! Maps the external Zcash/Bitcoin protocol to Zebra's internal request/response
//! protocol.
//!
//! This module contains a lot of undocumented state, assumptions and invariants.
//! And it's unclear if these assumptions match the `zcashd` implementation.
//! It should be refactored into a cleaner set of request/response pairs (#1515).

use std::{
collections::HashSet,
Expand Down Expand Up @@ -43,10 +39,13 @@ use super::{
};

#[derive(Debug)]
/// Internal state machine for [`State::AwaitingResponse`] used to coordinate
/// receiving expected responses.
pub(super) enum Handler {
/// Indicates that the handler has finished processing the request.
/// An error here is scoped to the request.
Finished(Result<Response, PeerError>),
// Expected response states
Ping(Nonce),
Peers,
FindBlocks,
Expand Down Expand Up @@ -312,25 +311,62 @@ impl Handler {

#[derive(Debug)]
#[must_use = "AwaitingResponse.tx.send() must be called before drop"]
/// The current state of the [`Connection`], consumed to execute the next step of
/// the state machine.
pub(super) enum State {
/// Awaiting a client request or a peer message.
AwaitingRequest,
/// Awaiting a peer message we can interpret as a client request.
AwaitingResponse {
/// Inner state machine for handling external responses.
handler: Handler,
/// Channel used to propagate responses back to the [`Client`] in our
/// internal Response format.
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
span: tracing::Span,
},
}

impl State {
/// Execute one step of the [`Connection`] state machine event loop. This
/// function represents the core logic of [`Connection::run`] method and
/// isolates consuming the previous state and producing the next state into a
/// single function.
///
/// This function's primary purpose is to provide compile time guarantees
/// that iterations of the run loop never leave the Connection with an
/// invalid `state`, by forcing all code paths to produce state transition in
/// order to exit the function.
async fn step<Rx, S, Tx>(self, conn: &mut Connection<S, Tx>, peer_rx: &mut Rx) -> Transition
where
Rx: Stream<Item = Result<Message, SerializationError>> + Unpin,
S: Service<Request, Response = Response, Error = BoxError>,
S::Error: Into<BoxError>,
Tx: Sink<Message, Error = SerializationError> + Unpin,
{
// At a high level, the event loop we want is as follows: we check for
// any incoming messages from the remote peer, check if they should be
// interpreted as a response to a pending client request
// (Handler::process_request), and if not, interpret them as a request
// from the remote peer to our node
// (Connection::handle_message_as_request/drive_peer_request).
//
// We also need to handle those client requests in the first place
// (Connection::handle_client_request). The client requests are received
// from the corresponding `peer::Client` over a bounded channel (with
// bound 1, to minimize buffering), but there is no relationship between
// the stream of client requests and the stream of peer messages, so we
// cannot ignore one kind while waiting on the other. Moreover, we
// cannot accept a second client request while the first one is still
// pending.
//
// To do this, we inspect the current request state.
//
// If there is no pending request, we wait on either an incoming peer message or
// an incoming request, whichever comes first.
//
// If there is a pending request, we wait only on an incoming peer message, and
// check whether it can be interpreted as a response to the pending request.
match self {
State::AwaitingRequest => {
trace!("awaiting client request or peer message");
Expand Down Expand Up @@ -437,7 +473,9 @@ impl State {
/// Enum describing the next state transition that should be taken after any
/// given `step`.
enum Transition {
/// Connection should start waiting for new requests.
AwaitRequest,
/// Connection should wait for a response to a previous request.
AwaitResponse {
handler: Handler,
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
Expand All @@ -446,15 +484,16 @@ enum Transition {
/// Closing because the client was closed or dropped, and there are
/// no more client requests.
ClientClose,
/// Closing while awaiting further client requests
/// Closing while awaiting further client requests.
Close(SharedPeerError),
/// Closing while processing a peer response to a client request
/// Closing while processing a peer response to a client request.
CloseResponse {
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
e: SharedPeerError,
},
}

/// Construct the appropriate `State` from a given `Transition` if possible.
impl TryFrom<Transition> for State {
type Error = Option<SharedPeerError>;

Expand All @@ -480,6 +519,8 @@ pub struct Connection<S, Tx> {
/// A timeout for a client request. This is stored separately from
/// State so that we can move the future out of it independently of
/// other state handling.
// I don't think this is necessary, and will try moving it into `State` in
// the next commit TODO(jane)
pub(super) request_timer: Option<Sleep>,
pub(super) svc: S,
/// A `mpsc::Receiver<ClientRequest>` that converts its results to
Expand All @@ -500,25 +541,6 @@ where
where
Rx: Stream<Item = Result<Message, SerializationError>> + Unpin,
{
// At a high level, the event loop we want is as follows: we check for any
// incoming messages from the remote peer, check if they should be interpreted
// as a response to a pending client request, and if not, interpret them as a
// request from the remote peer to our node.
//
// We also need to handle those client requests in the first place. The client
// requests are received from the corresponding `peer::Client` over a bounded
// channel (with bound 1, to minimize buffering), but there is no relationship
// between the stream of client requests and the stream of peer messages, so we
// cannot ignore one kind while waiting on the other. Moreover, we cannot accept
// a second client request while the first one is still pending.
//
// To do this, we inspect the current request state.
//
// If there is no pending request, we wait on either an incoming peer message or
// an incoming request, whichever comes first.
//
// If there is a pending request, we wait only on an incoming peer message, and
// check whether it can be interpreted as a response to the pending request.
loop {
let transition = self
.state
Expand Down

0 comments on commit 651d352

Please sign in to comment.