Skip to content

Commit

Permalink
rename transitions from Exit to Close
Browse files Browse the repository at this point in the history
  • Loading branch information
yaahc committed Feb 19, 2021
1 parent 5e4bf80 commit cfc4717
Showing 1 changed file with 31 additions and 27 deletions.
58 changes: 31 additions & 27 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,17 +335,19 @@ impl State {
State::AwaitingRequest => {
trace!("awaiting client request or peer message");
match future::select(peer_rx.next(), conn.client_rx.next()).await {
Either::Left((None, _)) => Transition::Exit(PeerError::ConnectionClosed.into()),
Either::Left((Some(Err(e)), _)) => Transition::Exit(e.into()),
Either::Left((None, _)) => {
Transition::Close(PeerError::ConnectionClosed.into())
}
Either::Left((Some(Err(e)), _)) => Transition::Close(e.into()),
Either::Left((Some(Ok(msg)), _)) => {
match conn.handle_message_as_request(msg).await {
Ok(()) => Transition::AwaitRequest,
Err(e) => Transition::Exit(e.into()),
Err(e) => Transition::Close(e.into()),
}
}
Either::Right((None, _)) => {
trace!("client_rx closed, ending connection");
Transition::ExitClient
Transition::ClientClose
}
Either::Right((Some(req), _)) => {
if req.tx.is_canceled() {
Expand Down Expand Up @@ -378,11 +380,13 @@ impl State {
.instrument(span.clone())
.await
{
Either::Left((None, _)) => Transition::ExitResponse {
Either::Left((None, _)) => Transition::CloseResponse {
e: PeerError::ConnectionClosed.into(),
tx,
},
Either::Left((Some(Err(e)), _)) => Transition::ExitResponse { e: e.into(), tx },
Either::Left((Some(Err(e)), _)) => {
Transition::CloseResponse { e: e.into(), tx }
}
Either::Left((Some(Ok(peer_msg)), _cancel)) => {
let request_msg = span.in_scope(|| handler.process_message(peer_msg));
// If the message was not consumed, check whether it
Expand All @@ -395,7 +399,7 @@ impl State {
Transition::AwaitResponse { tx, handler, span }
// Transition::AwaitRequest
}
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Transition::CloseResponse { e: e.into(), tx },
}
} else {
// Otherwise, check whether the handler is finished
Expand All @@ -413,7 +417,7 @@ impl State {
trace!(parent: &span, "client request timed out");
let e = PeerError::ClientRequestTimeout;
match handler {
Handler::Ping(_) => Transition::ExitResponse { e: e.into(), tx },
Handler::Ping(_) => Transition::CloseResponse { e: e.into(), tx },
_ => {
let _ = tx.send(Err(e.into()));
Transition::AwaitRequest
Expand All @@ -439,13 +443,13 @@ enum Transition {
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
span: tracing::Span,
},
/// Exiting because the client was closed or dropped, and there are
/// Closing because the client was closed or dropped, and there are
/// no more client requests.
ExitClient,
/// Exiting while awaiting further client requests
Exit(SharedPeerError),
/// Exiting while processing a peer response to a client request
ExitResponse {
ClientClose,
/// Closing while awaiting further client requests
Close(SharedPeerError),
/// Closing while processing a peer response to a client request
CloseResponse {
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
e: SharedPeerError,
},
Expand All @@ -460,9 +464,9 @@ impl TryFrom<Transition> for State {
Transition::AwaitResponse { handler, tx, span } => {
Ok(State::AwaitingResponse { handler, tx, span })
}
Transition::ExitClient => Err(None),
Transition::Exit(e) => Err(Some(e)),
Transition::ExitResponse { tx, e } => {
Transition::ClientClose => Err(None),
Transition::Close(e) => Err(Some(e)),
Transition::CloseResponse { tx, e } => {
let _ = tx.send(Err(e.clone()));
Err(Some(e))
}
Expand Down Expand Up @@ -567,15 +571,15 @@ where
tx,
span,
},
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Transition::CloseResponse { e: e.into(), tx },
},
Ping(nonce) => match self.peer_tx.send(Message::Ping(nonce)).await {
Ok(()) => Transition::AwaitResponse {
handler: Handler::Ping(nonce),
tx,
span,
},
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Transition::CloseResponse { e: e.into(), tx },
},
BlocksByHash(hashes) => {
match self
Expand All @@ -593,7 +597,7 @@ where
tx,
span,
},
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Transition::CloseResponse { e: e.into(), tx },
}
}
TransactionsByHash(hashes) => {
Expand All @@ -612,7 +616,7 @@ where
tx,
span,
},
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Transition::CloseResponse { e: e.into(), tx },
}
}
FindBlocks { known_blocks, stop } => {
Expand All @@ -626,7 +630,7 @@ where
tx,
span,
},
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Transition::CloseResponse { e: e.into(), tx },
}
}
FindHeaders { known_blocks, stop } => {
Expand All @@ -640,7 +644,7 @@ where
tx,
span,
},
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Transition::CloseResponse { e: e.into(), tx },
}
}
MempoolTransactions => match self.peer_tx.send(Message::Mempool).await {
Expand All @@ -649,7 +653,7 @@ where
tx,
span,
},
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Transition::CloseResponse { e: e.into(), tx },
},
PushTransaction(transaction) => {
match self.peer_tx.send(Message::Tx(transaction)).await {
Expand All @@ -659,7 +663,7 @@ where
let _ = tx.send(Ok(Response::Nil));
Transition::AwaitRequest
}
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Transition::CloseResponse { e: e.into(), tx },
}
}
AdvertiseTransactions(hashes) => {
Expand All @@ -674,7 +678,7 @@ where
let _ = tx.send(Ok(Response::Nil));
Transition::AwaitRequest
}
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Transition::CloseResponse { e: e.into(), tx },
}
}
AdvertiseBlock(hash) => {
Expand All @@ -685,7 +689,7 @@ where
let _ = tx.send(Ok(Response::Nil));
Transition::AwaitRequest
}
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Transition::CloseResponse { e: e.into(), tx },
}
}
}
Expand Down

0 comments on commit cfc4717

Please sign in to comment.