Skip to content

Commit

Permalink
introduce Transition enum
Browse files Browse the repository at this point in the history
  • Loading branch information
yaahc committed Feb 19, 2021
1 parent e6cb20e commit 6906f87
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 378 deletions.
1 change: 0 additions & 1 deletion zebra-network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use client::ClientRequest;
use client::ClientRequestReceiver;
use client::InProgressClientRequest;
use client::MustUseOneshotSender;
use error::ErrorSlot;

pub use client::Client;
pub use connection::Connection;
Expand Down
25 changes: 4 additions & 21 deletions zebra-network/src/peer/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,21 @@ use std::{

use futures::{
channel::{mpsc, oneshot},
future, ready,
ready,
stream::{Stream, StreamExt},
};
use tower::Service;

use crate::protocol::internal::{Request, Response};

use super::{ErrorSlot, PeerError, SharedPeerError};
use super::{PeerError, SharedPeerError};

/// The "client" duplex half of a peer connection.
pub struct Client {
// Used to shut down the corresponding heartbeat.
// This is always Some except when we take it on drop.
pub(super) shutdown_tx: Option<oneshot::Sender<()>>,
pub(super) server_tx: mpsc::Sender<ClientRequest>,
pub(super) error_slot: ErrorSlot,
}

/// A message from the `peer::Client` to the `peer::Server`.
Expand Down Expand Up @@ -98,13 +97,6 @@ impl From<ClientRequest> for InProgressClientRequest {
}
}

impl ClientRequestReceiver {
/// Forwards to `inner.close()`
pub fn close(&mut self) {
self.inner.close()
}
}

impl Stream for ClientRequestReceiver {
type Item = InProgressClientRequest;

Expand Down Expand Up @@ -199,10 +191,7 @@ impl Service<Request> for Client {

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if ready!(self.server_tx.poll_ready(cx)).is_err() {
Poll::Ready(Err(self
.error_slot
.try_get_error()
.expect("failed servers must set their error slot")))
Poll::Ready(Err(PeerError::ConnectionClosed.into()))
} else {
Poll::Ready(Ok(()))
}
Expand All @@ -221,13 +210,7 @@ impl Service<Request> for Client {
match self.server_tx.try_send(ClientRequest { request, span, tx }) {
Err(e) => {
if e.is_disconnected() {
let ClientRequest { tx, .. } = e.into_inner();
let _ = tx.send(Err(PeerError::ConnectionClosed.into()));
future::ready(Err(self
.error_slot
.try_get_error()
.expect("failed servers must set their error slot")))
.boxed()
async { Err(PeerError::ConnectionClosed.into()) }.boxed()
} else {
// sending fails when there's not enough
// channel space, but we called poll_ready
Expand Down

0 comments on commit 6906f87

Please sign in to comment.