Skip to content

Commit

Permalink
Add a real networked test and polish the API a bit (#32)
Browse files Browse the repository at this point in the history
We want to start moving towards a usable "clustering" API, so this is a first
step there to make networking a little simpler/cleaner, and pull some stuff out
of the Hydroflow struct for now.
  • Loading branch information
justinj committed Dec 17, 2021
1 parent bd32247 commit fc48b71
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 55 deletions.
8 changes: 7 additions & 1 deletion covid_tracing_dist/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use hydroflow::compiled::{pull::SymmetricHashJoin, IteratorToPusherator, Pushera
use hydroflow::lang::collections::Iter;
use hydroflow::scheduled::ctx::{RecvCtx, SendCtx};
use hydroflow::scheduled::{handoff::VecHandoff, net::Message};
use hydroflow::tokio::net::TcpListener;
use hydroflow::{
scheduled::{graph::Hydroflow, graph_ext::GraphExt},
tl, tt,
Expand All @@ -21,7 +22,12 @@ pub(crate) async fn run_database(opts: Opts) {
let (diagnoses_in, diagnoses_out) = df.add_channel_input();
let (people_in, people_out) = df.add_channel_input();

let (network_in, network_out) = df.bind_one(opts.port).await;
let stream = TcpListener::bind(format!("localhost:{}", opts.port))
.await
.unwrap();

let (stream, _) = stream.accept().await.unwrap();
let (network_in, network_out) = df.add_tcp_stream(stream);

let (encoded_notifs_in, notifs) =
df.add_inout(|_ctx, recv: &RecvCtx<VecHandoff<Message>>, send| {
Expand Down
6 changes: 5 additions & 1 deletion covid_tracing_dist/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{Decode, Encode, Opts, CONTACTS_ADDR, DIAGNOSES_ADDR};

use hydroflow::lang::collections::Iter;
use hydroflow::scheduled::{ctx::RecvCtx, graph::Hydroflow, handoff::VecHandoff, net::Message};
use hydroflow::tokio::net::TcpStream;
use hydroflow::{
compiled::{pull::SymmetricHashJoin, InputBuild, IteratorToPusherator, PusheratorBuild},
scheduled::graph_ext::GraphExt,
Expand Down Expand Up @@ -111,7 +112,10 @@ pub(crate) async fn run_tracker(opts: Opts) {
df.add_edge(diagnoses, diagnosed_in);
df.add_edge(loop_out, loop_in);

let (network_out, network_in) = df.connect(opts.addr.as_str()).await;
let stream = TcpStream::connect(format!("localhost:{}", opts.addr))
.await
.unwrap();
let (network_out, network_in) = df.add_tcp_stream(stream);

df.add_edge(notifs_out, encoder_in);
df.add_edge(encoder_out, network_out);
Expand Down
103 changes: 50 additions & 53 deletions hydroflow/src/scheduled/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ use std::collections::VecDeque;
use std::pin::Pin;

use futures::{Sink, StreamExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream,
};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};

use super::{
Expand All @@ -73,7 +76,7 @@ const MESSAGE_DATA: u8 = 0;
const TYPE_LEN: usize = 1;
const ADDRESS_LEN: usize = 4;

#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Message {
Data { address: u32, batch: bytes::Bytes },
}
Expand Down Expand Up @@ -103,72 +106,66 @@ impl Message {
}

impl Hydroflow {
fn add_tcp_stream(
fn register_read_tcp_stream(
&mut self,
stream: TcpStream,
) -> (
InputPort<VecHandoff<Message>>,
OutputPort<VecHandoff<Message>>,
) {
let (reader, writer) = stream.into_split();
reader: OwnedReadHalf,
) -> OutputPort<VecHandoff<Message>> {
let reader = FramedRead::new(reader, LengthDelimitedCodec::new());
let reader_port = self.add_input_from_stream::<_, VecHandoff<_>, _>(
self.add_input_from_stream::<_, VecHandoff<_>, _>(
reader.map(|buf| Some(<Message>::decode(&buf.unwrap().into()))),
);
let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new());
)
}

fn register_write_tcp_stream(
&mut self,
writer: OwnedWriteHalf,
) -> InputPort<VecHandoff<Message>> {
let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new());
let mut message_queue = VecDeque::new();
self.add_sink(move |ctx, recv: &RecvCtx<VecHandoff<Message>>| {
let waker = ctx.waker();
let mut cx = std::task::Context::from_waker(&waker);

// TODO(mingwei): queue may grow unbounded? Subtle rate matching concern.
// TODO(mingwei): put into state system.
message_queue.extend(recv.take_inner().into_iter());
while !message_queue.is_empty() {
if let std::task::Poll::Ready(Ok(())) = Pin::new(&mut writer).poll_ready(&mut cx) {
let v = message_queue.pop_front().unwrap();
let mut buf = Vec::new();
v.encode(&mut buf);

let writer_port: InputPort<VecHandoff<Message>> =
self.add_sink(move |ctx, recv: &RecvCtx<VecHandoff<Message>>| {
let waker = ctx.waker();
let mut cx = std::task::Context::from_waker(&waker);

// TODO(mingwei): queue may grow unbounded? Subtle rate matching concern.
// TODO(mingwei): put into state system.
message_queue.extend(recv.take_inner().into_iter());
while !message_queue.is_empty() {
if let std::task::Poll::Ready(Ok(())) =
Pin::new(&mut writer).poll_ready(&mut cx)
{
let v = message_queue.pop_front().unwrap();
let mut buf = Vec::new();
v.encode(&mut buf);

Pin::new(&mut writer).start_send(buf.into()).unwrap();
}
Pin::new(&mut writer).start_send(buf.into()).unwrap();
}
let _ = Pin::new(&mut writer).poll_flush(&mut cx);
});
(writer_port, reader_port)
}
let _ = Pin::new(&mut writer).poll_flush(&mut cx);
})
}

// Connects to the specified address, returning an input and output port
// allowing communication on it.
pub async fn connect(
&mut self,
addr: &str,
) -> (
InputPort<VecHandoff<Message>>,
OutputPort<VecHandoff<Message>>,
) {
let stream = TcpStream::connect(addr).await.unwrap();
self.add_tcp_stream(stream)
pub fn add_write_tcp_stream(&mut self, stream: TcpStream) -> InputPort<VecHandoff<Message>> {
let (_, writer) = stream.into_split();

self.register_write_tcp_stream(writer)
}

// Waits for a single connection on the specified unix port, returning an input
// and output port allowing communication on it.
pub async fn bind_one(
pub fn add_read_tcp_stream(&mut self, stream: TcpStream) -> OutputPort<VecHandoff<Message>> {
let (reader, _) = stream.into_split();

self.register_read_tcp_stream(reader)
}

pub fn add_tcp_stream(
&mut self,
port: usize,
stream: TcpStream,
) -> (
InputPort<VecHandoff<Message>>,
OutputPort<VecHandoff<Message>>,
) {
let stream = TcpListener::bind(format!("localhost:{}", port))
.await
.unwrap();
let (stream, _) = stream.accept().await.unwrap();
self.add_tcp_stream(stream)
let (reader, writer) = stream.into_split();

(
self.register_write_tcp_stream(writer),
self.register_read_tcp_stream(reader),
)
}
}
81 changes: 81 additions & 0 deletions hydroflow/tests/networked.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::sync::mpsc::channel;

use hydroflow::{
lang::collections::Iter,
scheduled::{
ctx::RecvCtx, graph::Hydroflow, graph_demux::GraphDemux, graph_ext::GraphExt,
handoff::VecHandoff, net::Message,
},
};
use tokio::net::{TcpListener, TcpStream};

#[test]
fn test_networked() {
let (port_sender, port_receiver) = channel();

std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let mut df = Hydroflow::new();

let stream = TcpListener::bind("localhost:0").await.unwrap();
let addr = stream.local_addr().unwrap();

port_sender.send(addr.port()).unwrap();

let (stream, _) = stream.accept().await.unwrap();
let network_send = df.add_write_tcp_stream(stream);

let (input, out) = df.add_input();

input.give(Iter(
vec![Message::Data {
address: 0,
batch: vec![1, 2, 3, 4].into(),
}]
.into_iter(),
));

df.add_edge(out, network_send);

df.run_async().await.unwrap();
});
});

let (send, recv) = channel();

std::thread::spawn(move || {
let port = port_receiver.recv().unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let mut df = Hydroflow::new();

let stream = TcpStream::connect(format!("localhost:{}", port))
.await
.unwrap();
let network_recv = df.add_read_tcp_stream(stream);

let input = df.add_sink(move |_ctx, recv: &RecvCtx<VecHandoff<_>>| {
for v in recv.take_inner() {
send.send(v).unwrap();
}
});

let (demux, input_port) = df.add_demux::<_, _, _, VecHandoff<_>>(|_| ());

df.add_edge(network_recv, input_port);
df.add_demux_edge(&demux, (), input);

df.run_async().await.unwrap();
});
});

let val = recv.recv().unwrap();
assert_eq!(
val,
Message::Data {
address: 0,
batch: vec![1, 2, 3, 4].into(),
}
);
}

0 comments on commit fc48b71

Please sign in to comment.