Skip to content

Commit

Permalink
zebrad: fill in more of inbound component
Browse files Browse the repository at this point in the history
WIP: this is getting stuck on
rust-lang/rust#64552
  • Loading branch information
hdevalence committed Sep 18, 2020
1 parent 48a251d commit ab92db3
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 5 deletions.
2 changes: 1 addition & 1 deletion zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl StartCmd {
let inbound = ServiceBuilder::new()
.load_shed()
.buffer(20)
.service(Inbound::new(setup_rx));
.service(Inbound::new(setup_rx, state.clone()));

let (peer_set, address_book) = zebra_network::init(config.network.clone(), inbound).await;
setup_tx
Expand Down
54 changes: 50 additions & 4 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ use std::{
task::{Context, Poll},
};

use futures::future::FutureExt;
use futures::{future::FutureExt, stream::StreamExt};
use tokio::sync::oneshot;
use tower::{buffer::Buffer, util::BoxService, Service};
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};

use zebra_network as zn;
use zebra_network::AddressBook;
use zebra_state as zs;

type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;

pub type SetupData = (Outbound, Arc<Mutex<AddressBook>>);

Expand All @@ -25,14 +27,16 @@ pub struct Inbound {
network_setup: Option<oneshot::Receiver<SetupData>>,
outbound: Option<Outbound>,
address_book: Option<Arc<Mutex<zn::AddressBook>>>,
state: State,
}

impl Inbound {
pub fn new(network_setup: oneshot::Receiver<SetupData>) -> Self {
pub fn new(network_setup: oneshot::Receiver<SetupData>, state: State) -> Self {
Self {
network_setup: Some(network_setup),
outbound: None,
address_book: None,
state,
}
}
}
Expand Down Expand Up @@ -89,10 +93,52 @@ impl Service<zn::Request> for Inbound {
peers.truncate(MAX_ADDR);
async { Ok(zn::Response::Peers(peers)) }.boxed()
}
_ => {
zn::Request::BlocksByHash(hashes) => {
use futures::future::ready;
let mut state = self.state.clone();
async move {
let requests = futures::stream::iter(
hashes.into_iter().map(|h| zs::Request::Block(h.into())),
);
let blocks = state
.call_all(requests)
.filter_map(|rsp| {
ready(match rsp {
Ok(zs::Response::Block(Some(block))) => Some(block),
// XXX: check how zcashd handles missing blocks?
// XXX: how do we want to handle state errors?
_ => None,
})
})
.collect::<Vec<_>>()
.await;

Ok(zn::Response::Blocks(blocks))
}.boxed()
}
zn::Request::TransactionsByHash(_transactions) => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::FindBlocks { .. } => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::PushTransaction(_transaction) => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::AdvertiseTransactions(_transactions) => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::AdvertiseBlock(_block) => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::Ping(_) => {
unreachable!("ping requests are handled internally");
}
}
}
}

0 comments on commit ab92db3

Please sign in to comment.