Skip to content

Commit

Permalink
metastore: topologically sort dataflows
Browse files Browse the repository at this point in the history
It turns out that sorting by creation time, which I recently ripped out,
implicitly provides a topological sort, so restore it.
  • Loading branch information
benesch committed Apr 2, 2019
1 parent f6424cb commit 3287bc5
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 9 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/metastore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bincode = "1.1.2"
failure = "0.1.5"
futures = "0.1"
lazy_static = "1.3.0"
linked-hash-map = "0.5.2"
log = "0.4"
ore = { path = "../ore" }
serde = { version = "1.0.89", features = ["derive"] }
Expand Down
21 changes: 16 additions & 5 deletions src/metastore/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::sync::mpsc;
use futures::sync::mpsc::Sender;
use futures::{future, stream, Future, Sink, Stream};
use lazy_static::lazy_static;
use linked_hash_map::LinkedHashMap;
use log::error;
use serde::de::DeserializeOwned;
use serde::Serialize;
Expand Down Expand Up @@ -45,7 +46,8 @@ pub struct MetaStore<D> {
}

struct Inner<D> {
dataflows: HashMap<String, D>,
// Dataflows are stored in a LinkedHashMap to maintain topological ordering.
dataflows: LinkedHashMap<String, D>,
senders: Vec<Sender<DataflowEvent<D>>>,
}

Expand All @@ -64,7 +66,7 @@ where
}

let inner = Arc::new(Mutex::new(Inner {
dataflows: HashMap::new(),
dataflows: LinkedHashMap::new(),
senders: Vec::new(),
}));

Expand Down Expand Up @@ -287,17 +289,26 @@ where
.collect();

future::join_all(data_futs).and_then(move |results| {
let results: Vec<_> = results
let mut results: Vec<_> = results
.into_iter()
.filter_map(|(_zk, name, data)| data.map(|(bytes, _stat)| (name, bytes)))
.filter_map(|(_zk, name, data)| data.map(|(bytes, stat)| (stat.czxid, name, bytes)))
.collect();
// It is very important to present dataflows in topological order, or
// building the dataflow will crash because the dataflows upon which
// it is built will not yet be present. Sorting by order of creation is
// implicitly a topological sort, as you can't create a dataflow unless
// all the dataflows upon which it depends are already created.
//
// Warning: this assumption breaks if we start altering dataflows
// in place. That's not on the horizon, though.
results.sort_by_key(|r| r.0);

let mut send_futs: FuturesUnordered<
Box<dyn Future<Item = (), Error = failure::Error> + Send>,
> = FuturesUnordered::new();
{
let mut inner = inner.lock().unwrap();
for (name, bytes) in results {
for (_czxid, name, bytes) in results {
let dataflow: D = match BINCODER.deserialize(&bytes) {
Ok(d) => d,
Err(err) => return future::err(failure::Error::from_boxed_compat(err)).left(),
Expand Down
45 changes: 41 additions & 4 deletions src/metastore/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,29 @@ fn test_basic() -> Result<(), failure::Error> {
// Create a watch after dataflows are created.
let mut watch1b = ms1.register_dataflow_watch();

// Verify that all watches see the same events.
// The first two dataflows were created sequentially. Verify that all
// watchers saw them in their known order of creation.
assert_events(
vec![&mut watch1a, &mut watch1b, &mut watch2],
&[
DataflowEvent::Created(DummyDataflow("basic".into())),
DataflowEvent::Created(DummyDataflow("basic2".into())),
],
Order::Exact,
);

// The remaining five dataflows were created concurrently, so we don't know
// exactly what ordering they were created in.
assert_events(
vec![&mut watch1a, &mut watch1b, &mut watch2],
&[
DataflowEvent::Created(DummyDataflow("concurrent0".into())),
DataflowEvent::Created(DummyDataflow("concurrent1".into())),
DataflowEvent::Created(DummyDataflow("concurrent2".into())),
DataflowEvent::Created(DummyDataflow("concurrent3".into())),
DataflowEvent::Created(DummyDataflow("concurrent4".into())),
],
Order::MutualAgreement,
);

// Delete a dataflow.
Expand All @@ -78,6 +89,7 @@ fn test_basic() -> Result<(), failure::Error> {
assert_events(
vec![&mut watch1a, &mut watch1b, &mut watch2],
&[DataflowEvent::Deleted("basic".into())],
Order::Exact,
);

// Create a new watch, after the deletion, and verify that it sees a
Expand All @@ -87,12 +99,19 @@ fn test_basic() -> Result<(), failure::Error> {
vec![&mut watch1c],
&[
DataflowEvent::Created(DummyDataflow("basic2".into())),
],
Order::Exact,
);
assert_events(
vec![&mut watch1c],
&[
DataflowEvent::Created(DummyDataflow("concurrent0".into())),
DataflowEvent::Created(DummyDataflow("concurrent1".into())),
DataflowEvent::Created(DummyDataflow("concurrent2".into())),
DataflowEvent::Created(DummyDataflow("concurrent3".into())),
DataflowEvent::Created(DummyDataflow("concurrent4".into())),
],
Order::Exact,
);

// Drop the MetaStores, which will cancel any background futures they've
Expand All @@ -115,7 +134,16 @@ fn test_basic() -> Result<(), failure::Error> {
Ok(())
}

fn assert_events<S>(streams: S, events: &[DataflowEvent<DummyDataflow>])
#[derive(Clone, Copy, Eq, PartialEq)]
enum Order {
/// The ordering must match exactly the ordering specified.
Exact,
/// The exact ordering does not matter, but it must be consistent with the
/// other watchers.
MutualAgreement,
}

fn assert_events<S>(streams: S, events: &[DataflowEvent<DummyDataflow>], ord: Order)
where
S: IntoIterator,
S::Item: Stream<Item = DataflowEvent<DummyDataflow>>,
Expand All @@ -124,8 +152,17 @@ where
let n = events.len() as u64;
let streams = streams.into_iter().map(|s| s.take(n).collect());
let results = future::join_all(streams).wait().unwrap();
if results.len() == 0 {
panic!("assert_events called with no event streams")
}
let baseline = &results[0];
for (i, res) in results.iter().enumerate() {
assert_eq!(res, baseline, "watcher {} and 0 disagree on ordering", i)
}
for (i, mut res) in results.into_iter().enumerate() {
res.sort();
assert_eq!(res, events, "watcher {} event mismatch", i);
if ord != Order::Exact {
res.sort();
}
assert_eq!(res, events, "watcher {} does not match expected ordering", i);
}
}

0 comments on commit 3287bc5

Please sign in to comment.