Skip to content

Commit

Permalink
Rename make_handoff to make_edge
Browse files Browse the repository at this point in the history
  • Loading branch information
MingweiSamuel committed Feb 4, 2022
1 parent b27c79c commit c5f726c
Show file tree
Hide file tree
Showing 18 changed files with 59 additions and 62 deletions.
10 changes: 5 additions & 5 deletions benches/benches/fork_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn benchmark_hydroflow(c: &mut Criterion) {
b.iter(|| {
let mut df = Hydroflow::new();

let (start_send, start_recv) = df.make_handoff::<VecHandoff<usize>>();
let (start_send, start_recv) = df.make_edge::<VecHandoff<usize>>();

let mut sent = false;
df.add_subgraph_source(start_send, move |_ctx, send| {
Expand All @@ -25,8 +25,8 @@ fn benchmark_hydroflow(c: &mut Criterion) {
}
});

let (send1, mut recv1) = df.make_handoff::<VecHandoff<_>>();
let (send2, mut recv2) = df.make_handoff::<VecHandoff<_>>();
let (send1, mut recv1) = df.make_edge::<VecHandoff<_>>();
let (send2, mut recv2) = df.make_edge::<VecHandoff<_>>();

df.add_subgraph_in_2out(start_recv, send1, send2, |_ctx, recv, send1, send2| {
for v in recv.take_inner().into_iter() {
Expand All @@ -39,8 +39,8 @@ fn benchmark_hydroflow(c: &mut Criterion) {
});

for _ in 0..NUM_OPS {
let (send1, next_recv1) = df.make_handoff();
let (send2, next_recv2) = df.make_handoff();
let (send1, next_recv1) = df.make_edge();
let (send2, next_recv2) = df.make_edge();

df.add_subgraph_2in_2out(
recv1,
Expand Down
4 changes: 2 additions & 2 deletions benches/benches/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ fn benchmark_hydroflow(c: &mut Criterion) {
b.iter(|| {
let mut df = Hydroflow::new();

let (next_send, mut next_recv) = df.make_handoff::<VecHandoff<usize>>();
let (next_send, mut next_recv) = df.make_edge::<VecHandoff<usize>>();

let mut sent = false;
df.add_subgraph_source(next_send, move |_ctx, send| {
Expand All @@ -164,7 +164,7 @@ fn benchmark_hydroflow(c: &mut Criterion) {
}
});
for _ in 0..NUM_OPS {
let (next_send, next_next_recv) = df.make_handoff();
let (next_send, next_next_recv) = df.make_edge();

df.add_subgraph_in_out(next_recv, next_send, |_ctx, recv, send| {
send.give(Iter(recv.take_inner().into_iter()));
Expand Down
18 changes: 9 additions & 9 deletions benches/benches/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,12 @@ fn benchmark_hydroflow_scheduled(c: &mut Criterion) {
let mut df = Hydroflow::new();

type Hoff = VecHandoff<usize>;
let (reachable_out, merge_lhs) = df.make_handoff::<Hoff>();
let (neighbors_out, merge_rhs) = df.make_handoff::<Hoff>();
let (merge_out, distinct_in) = df.make_handoff::<Hoff>();
let (distinct_out, tee_in) = df.make_handoff::<Hoff>();
let (tee_out1, neighbors_in) = df.make_handoff::<Hoff>();
let (tee_out2, sink_in) = df.make_handoff::<Hoff>();
let (reachable_out, merge_lhs) = df.make_edge::<Hoff>();
let (neighbors_out, merge_rhs) = df.make_edge::<Hoff>();
let (merge_out, distinct_in) = df.make_edge::<Hoff>();
let (distinct_out, tee_in) = df.make_edge::<Hoff>();
let (tee_out1, neighbors_in) = df.make_edge::<Hoff>();
let (tee_out2, sink_in) = df.make_edge::<Hoff>();

df.add_subgraph_source(reachable_out, move |_ctx, send| {
send.give(Some(1));
Expand Down Expand Up @@ -207,9 +207,9 @@ fn benchmark_hydroflow(c: &mut Criterion) {
// A dataflow that represents graph reachability.
let mut df = Hydroflow::new();

let (reachable_out, origins_in) = df.make_handoff::<VecHandoff<usize>>();
let (did_reach_out, possible_reach_in) = df.make_handoff::<VecHandoff<usize>>();
let (output_out, sink_in) = df.make_handoff::<VecHandoff<usize>>();
let (reachable_out, origins_in) = df.make_edge::<VecHandoff<usize>>();
let (did_reach_out, possible_reach_in) = df.make_edge::<VecHandoff<usize>>();
let (output_out, sink_in) = df.make_edge::<VecHandoff<usize>>();

df.add_subgraph_source(reachable_out, move |_ctx, send| {
send.give(Some(1));
Expand Down
3 changes: 1 addition & 2 deletions chat/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ pub(crate) async fn run_server(opts: Opts) {

// 2. feed new members into the join
// But first, we need a buffer to turn push into pull for cross_join.
let (memberships_push, memberships_pull) =
hf.make_handoff::<VecHandoff<String>, Option<String>>();
let (memberships_push, memberships_pull) = hf.make_edge::<VecHandoff<String>, Option<String>>();
// and now the other start_tee
let member_join_input = hf
.start_tee()
Expand Down
4 changes: 2 additions & 2 deletions covid_tracing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ fn main() {
df.add_channel_input::<_, VecHandoff<(Pid, (DateTime, DateTime))>>();
let (people_send, people_recv) = df.add_channel_input::<_, VecHandoff<(Pid, (Name, Phone))>>();

let (loop_send, loop_recv) = df.make_handoff::<VecHandoff<(Pid, DateTime)>>();
let (notifs_send, notifs_recv) = df.make_handoff::<VecHandoff<(Pid, DateTime)>>();
let (loop_send, loop_recv) = df.make_edge::<VecHandoff<(Pid, DateTime)>>();
let (notifs_send, notifs_recv) = df.make_edge::<VecHandoff<(Pid, DateTime)>>();

type MyJoinState = RefCell<JoinState<&'static str, (usize, usize), (&'static str, usize)>>;
let state_handle = df.add_state(MyJoinState::default());
Expand Down
6 changes: 3 additions & 3 deletions covid_tracing_dist/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ pub(crate) async fn run_database(opts: Opts) {

let mut df = Hydroflow::new();

let (notifs, notif_sink) = df.make_handoff::<VecHandoff<(String, usize)>>();
let (encode_contacts_out, contacts_merge) = df.make_handoff::<VecHandoff<Message>>();
let (encode_diagnoses_out, diagnoses_merge) = df.make_handoff::<VecHandoff<Message>>();
let (notifs, notif_sink) = df.make_edge::<VecHandoff<(String, usize)>>();
let (encode_contacts_out, contacts_merge) = df.make_edge::<VecHandoff<Message>>();
let (encode_diagnoses_out, diagnoses_merge) = df.make_edge::<VecHandoff<Message>>();

let (contacts_in, contacts_out) =
df.add_channel_input::<_, VecHandoff<(&'static str, &'static str, usize)>>();
Expand Down
8 changes: 4 additions & 4 deletions covid_tracing_dist/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ pub(crate) async fn run_tracker(opts: Opts) {
let stream = TcpStream::connect(opts.addr).await.unwrap();
let (network_out, network_in) = df.add_tcp_stream(stream);

let (contacts, contacts_in) = df.make_handoff::<VecHandoff<(String, String, usize)>>();
let (diagnoses, diagnosed_in) = df.make_handoff::<VecHandoff<(String, (usize, usize))>>();
let (loop_out, loop_in) = df.make_handoff::<VecHandoff<(Pid, DateTime)>>();
let (notifs_out, encoder_in) = df.make_handoff::<VecHandoff<(Pid, DateTime)>>();
let (contacts, contacts_in) = df.make_edge::<VecHandoff<(String, String, usize)>>();
let (diagnoses, diagnosed_in) = df.make_edge::<VecHandoff<(String, (usize, usize))>>();
let (loop_out, loop_in) = df.make_edge::<VecHandoff<(Pid, DateTime)>>();
let (notifs_out, encoder_in) = df.make_edge::<VecHandoff<(Pid, DateTime)>>();

df.add_subgraph(
tl!(network_in),
Expand Down
6 changes: 2 additions & 4 deletions hydroflow/src/builder/hydroflow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ pub struct HydroflowBuilder {
impl HydroflowBuilder {
/// Creates a handoff, returning push and pull ends which can be chained
/// using the Surface API.
pub fn make_handoff<H, T>(
&mut self,
) -> (HandoffPushSurfaceReversed<H, T>, HandoffPullSurface<H>)
pub fn make_edge<H, T>(&mut self) -> (HandoffPushSurfaceReversed<H, T>, HandoffPullSurface<H>)
where
H: Handoff + CanReceive<T>,
{
let (send, recv) = self.hydroflow.make_handoff();
let (send, recv) = self.hydroflow.make_edge();
let push = HandoffPushSurfaceReversed::new(send);
let pull = HandoffPullSurface::new(recv);
(push, pull)
Expand Down
4 changes: 2 additions & 2 deletions hydroflow/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ fn test_covid() {

let mut builder = HydroflowBuilder::default();

let (loop_send, loop_recv) = builder.make_handoff::<VecHandoff<(Pid, DateTime)>, _>();
let (notifs_send, notifs_recv) = builder.make_handoff::<VecHandoff<(Pid, DateTime)>, _>();
let (loop_send, loop_recv) = builder.make_edge::<VecHandoff<(Pid, DateTime)>, _>();
let (notifs_send, notifs_recv) = builder.make_edge::<VecHandoff<(Pid, DateTime)>, _>();

let (diagnosed_send, diagnosed) =
builder.add_channel_input::<Option<(Pid, (DateTime, DateTime))>, VecHandoff<_>>();
Expand Down
4 changes: 2 additions & 2 deletions hydroflow/src/builder/surface/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ where
Other: PullSurface<ItemOut = (Key, Val)>,
{
let (local_inputs_send, local_inputs_recv) =
builder.make_handoff::<VecHandoff<(Key, Val)>, Option<(Key, Val)>>();
builder.make_edge::<VecHandoff<(Key, Val)>, Option<(Key, Val)>>();

let num_participants: u64 = address_book.len().try_into().unwrap();

Expand Down Expand Up @@ -118,7 +118,7 @@ where
Other: PullSurface<ItemOut = U>,
{
let (local_inputs_send, local_inputs_recv) =
builder.make_handoff::<VecHandoff<U>, Option<U>>();
builder.make_edge::<VecHandoff<U>, Option<U>>();

builder.add_subgraph(
IterPullSurface::new(addresses.into_iter())
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl Hydroflow {
sg_id
}

pub fn make_handoff<H>(&mut self) -> (InputPort<H>, OutputPort<H>)
pub fn make_edge<H>(&mut self) -> (InputPort<H>, OutputPort<H>)
where
H: 'static + Handoff,
{
Expand Down
4 changes: 2 additions & 2 deletions hydroflow/src/scheduled/graph_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl GraphExt for Hydroflow {
use std::sync::mpsc;

let (sender, receiver) = mpsc::sync_channel(8000);
let (send_port, recv_port) = self.make_handoff();
let (send_port, recv_port) = self.make_edge();
let sg_id = self.add_subgraph_source::<_, W>(send_port, move |_ctx, send| {
for x in receiver.try_iter() {
send.give(x);
Expand All @@ -145,7 +145,7 @@ impl GraphExt for Hydroflow {
{
let input = super::input::Buffer::default();
let inner_input = input.clone();
let (send_port, recv_port) = self.make_handoff::<W>();
let (send_port, recv_port) = self.make_edge::<W>();
let sg_id = self.add_subgraph_source::<_, W>(send_port, move |_ctx, send| {
for x in (*inner_input.0).borrow_mut().drain(..) {
send.give(x);
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/src/scheduled/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
pub mod context;
pub mod graph;
pub mod port;
pub mod graph_ext;
pub mod handoff;
#[cfg(feature = "variadic_generics")]
pub mod input;
pub mod net;
pub mod port;
pub mod query;
pub mod reactor;
pub mod state;
Expand Down
4 changes: 2 additions & 2 deletions hydroflow/src/scheduled/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl Hydroflow {
reader: OwnedReadHalf,
) -> OutputPort<VecHandoff<Message>> {
let reader = FramedRead::new(reader, LengthDelimitedCodec::new());
let (send_port, recv_port) = self.make_handoff();
let (send_port, recv_port) = self.make_edge();
self.add_input_from_stream(
send_port,
reader.map(|buf| Some(<Message>::decode(buf.unwrap().into()))),
Expand All @@ -118,7 +118,7 @@ impl Hydroflow {
let mut writer = FramedWrite::new(writer, LengthDelimitedCodec::new());
let mut message_queue = VecDeque::new();

let (input_port, output_port) = self.make_handoff::<VecHandoff<Message>>();
let (input_port, output_port) = self.make_edge::<VecHandoff<Message>>();
self.add_subgraph_sink(output_port, move |ctx, recv| {
let waker = ctx.waker();
let mut cx = std::task::Context::from_waker(&waker);
Expand Down
4 changes: 2 additions & 2 deletions hydroflow/src/scheduled/net/network_vertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Hydroflow {
}
});

let (send_port, recv_port) = self.make_handoff();
let (send_port, recv_port) = self.make_edge();
self.add_input_from_stream(send_port, incoming_messages.map(Some));

(port, recv_port)
Expand Down Expand Up @@ -199,7 +199,7 @@ impl Hydroflow {

let mut buffered_messages = Vec::new();
let mut next_messages = Vec::new();
let (input_port, output_port) = self.make_handoff();
let (input_port, output_port) = self.make_edge();
self.add_subgraph_sink(output_port, move |_ctx, recv| {
buffered_messages.extend(recv.take_inner());
for msg in buffered_messages.drain(..) {
Expand Down
12 changes: 6 additions & 6 deletions hydroflow/src/scheduled/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl Query {
{
let mut df = self.df.borrow_mut();

let (send_port, recv_port) = df.make_handoff();
let (send_port, recv_port) = df.make_edge();
df.add_subgraph_source(send_port, f);

Operator {
Expand All @@ -40,7 +40,7 @@ impl Query {
{
let mut df = self.df.borrow_mut();

let (send_port, recv_port) = df.make_handoff();
let (send_port, recv_port) = df.make_edge();
df.add_subgraph_n_m(
ops.into_iter().map(|op| op.recv_port).collect(),
vec![send_port],
Expand Down Expand Up @@ -81,7 +81,7 @@ where
{
let mut df = self.df.borrow_mut();

let (send_port, recv_port) = df.make_handoff();
let (send_port, recv_port) = df.make_edge();
df.add_subgraph_in_out(self.recv_port, send_port, move |_ctx, recv, send| {
send.give(Iter(recv.take_inner().into_iter().map(&mut f)));
});
Expand All @@ -100,7 +100,7 @@ where
{
let mut df = self.df.borrow_mut();

let (send_port, recv_port) = df.make_handoff();
let (send_port, recv_port) = df.make_edge();
df.add_subgraph_in_out(self.recv_port, send_port, move |_ctx, recv, send| {
send.give(Iter(recv.take_inner().into_iter().filter(&mut f)));
});
Expand All @@ -118,7 +118,7 @@ where

let mut df = self.df.borrow_mut();

let (send_port, recv_port) = df.make_handoff::<VecHandoff<T>>();
let (send_port, recv_port) = df.make_edge::<VecHandoff<T>>();
df.add_subgraph_2in_out(
self.recv_port,
other.recv_port,
Expand Down Expand Up @@ -163,7 +163,7 @@ impl<T: Clone> Operator<T> {
let mut sends = Vec::with_capacity(n);
let mut recvs = Vec::with_capacity(n);
for _ in 0..n {
let (send_port, recv_port) = df.make_handoff::<VecHandoff<T>>();
let (send_port, recv_port) = df.make_edge::<VecHandoff<T>>();
sends.push(send_port);
recvs.push(Operator {
df: self.df.clone(),
Expand Down
24 changes: 12 additions & 12 deletions hydroflow/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ fn map_filter() {
// A simple dataflow with one source feeding into one sink with some processing in the middle.
let mut df = Hydroflow::new();

let (source, map_in) = df.make_handoff::<VecHandoff<i32>>();
let (map_out, filter_in) = df.make_handoff::<VecHandoff<i32>>();
let (filter_out, sink) = df.make_handoff::<VecHandoff<i32>>();
let (source, map_in) = df.make_edge::<VecHandoff<i32>>();
let (map_out, filter_in) = df.make_edge::<VecHandoff<i32>>();
let (filter_out, sink) = df.make_edge::<VecHandoff<i32>>();

let data = [1, 2, 3, 4];
df.add_subgraph(tl!(), tl!(source), move |_ctx, tl!(), tl!(send)| {
Expand Down Expand Up @@ -67,7 +67,7 @@ fn map_filter() {
#[test]
fn test_basic_variadic() {
let mut df = Hydroflow::new();
let (source_send, sink_recv) = df.make_handoff::<VecHandoff<usize>>();
let (source_send, sink_recv) = df.make_edge::<VecHandoff<usize>>();
df.add_subgraph_source(source_send, move |_ctx, send| {
send.give(Some(5));
});
Expand All @@ -91,7 +91,7 @@ fn test_basic_variadic() {
fn test_basic_n_m() {
let mut df = Hydroflow::new();

let (source_send, sink_recv) = df.make_handoff::<VecHandoff<usize>>();
let (source_send, sink_recv) = df.make_edge::<VecHandoff<usize>>();

df.add_subgraph_n_m(
vec![],
Expand Down Expand Up @@ -140,12 +140,12 @@ fn test_cycle() {

let mut df = Hydroflow::new();

let (reachable, merge_lhs) = df.make_handoff::<VecHandoff<usize>>();
let (neighbors_out, merge_rhs) = df.make_handoff::<VecHandoff<usize>>();
let (merge_out, distinct_in) = df.make_handoff::<VecHandoff<usize>>();
let (distinct_out, tee_in) = df.make_handoff::<VecHandoff<usize>>();
let (tee_out1, neighbors_in) = df.make_handoff::<VecHandoff<usize>>();
let (tee_out2, sink_in) = df.make_handoff::<VecHandoff<usize>>();
let (reachable, merge_lhs) = df.make_edge::<VecHandoff<usize>>();
let (neighbors_out, merge_rhs) = df.make_edge::<VecHandoff<usize>>();
let (merge_out, distinct_in) = df.make_edge::<VecHandoff<usize>>();
let (distinct_out, tee_in) = df.make_edge::<VecHandoff<usize>>();
let (tee_out1, neighbors_in) = df.make_edge::<VecHandoff<usize>>();
let (tee_out2, sink_in) = df.make_edge::<VecHandoff<usize>>();

let mut initially_reachable = vec![1];
df.add_subgraph_source(reachable, move |_ctx, send| {
Expand Down Expand Up @@ -343,7 +343,7 @@ fn test_input_channel() {
let done_inner = done.clone();
let mut df = Hydroflow::new();

let (in_chan, input) = df.make_handoff();
let (in_chan, input) = df.make_edge();
df.add_input_from_stream::<_, VecHandoff<usize>, _>(in_chan, receiver);
df.add_subgraph_sink(input, move |_ctx, recv| {
for v in recv.take_inner() {
Expand Down
2 changes: 1 addition & 1 deletion relalg/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub(crate) fn run_dataflow(r: RelExpr) -> Vec<Vec<Datum>> {
}

fn render_relational(df: &mut Hydroflow, r: RelExpr) -> OutputPort<VecHandoff<Vec<Datum>>> {
let (send_port, recv_port) = df.make_handoff();
let (send_port, recv_port) = df.make_edge();
match r {
RelExpr::Values(mut v) => {
// TODO: drip-feed data?
Expand Down

0 comments on commit c5f726c

Please sign in to comment.