Skip to content

Commit

Permalink
Merge pull request #7630 from danhhz/persist_future
Browse files Browse the repository at this point in the history
persist: start replacing CmdResponse with Future
  • Loading branch information
danhhz committed Aug 4, 2021
2 parents 11c2298 + 327ffc4 commit 2dd5445
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 263 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 43 additions & 30 deletions src/coord/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
use std::cell::RefCell;
use std::cmp;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::future::Future;
use std::path::Path;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
Expand All @@ -56,17 +57,14 @@ use itertools::Itertools;
use lazy_static::lazy_static;
use ore::metrics::MetricsRegistry;
use ore::retry::Retry;
use persist::error::Error as PersistError;
use persist::indexed::runtime::CmdResponse;
use persist::storage::SeqNo;
use rand::Rng;
use repr::adt::numeric;
use timely::communication::WorkerGuards;
use timely::order::PartialOrder;
use timely::progress::frontier::MutableAntichain;
use timely::progress::{Antichain, ChangeBatch, Timestamp as _};
use tokio::runtime::Handle as TokioHandle;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::sync::{mpsc, watch};
use tokio_stream::wrappers::UnboundedReceiverStream;

use build_info::BuildInfo;
Expand Down Expand Up @@ -551,16 +549,25 @@ impl Coordinator {
if next_ts > self.closed_up_to {
if let Some(persist_multi) = self.catalog.persist_multi_details() {
// Close out the timestamp for persisted tables.
let (tx, rx) = std::sync::mpsc::channel();
persist_multi
//
// NB: Keep this method call outside the tokio::spawn. We're
// guaranteed by persist that writes and seals happen in order,
// but only if we synchronously wait for the (fast) registration
// of that work to return.
let seal_res = persist_multi
.write_handle
.seal(&persist_multi.all_table_ids, next_ts, tx.into());
for res in rx {
if let Err(err) = res {
// TODO: Linearizability relies on this, bubble up the error instead.
.seal(&persist_multi.all_table_ids, next_ts);
let _ = tokio::spawn(async move {
if let Err(err) = seal_res.into_future().await {
// TODO: Linearizability relies on this, bubble up the
// error instead.
//
// EDIT: On further consideration, I think it doesn't
// affect correctness if this fails, just availability
// of the table.
log::error!("failed to seal persisted stream to ts {}: {}", next_ts, err);
}
}
});
}

self.broadcast(dataflow::Command::AdvanceAllLocalInputs {
Expand Down Expand Up @@ -2199,12 +2206,9 @@ impl Coordinator {
Ok(Some(rx)) => {
tokio::spawn(async move {
let tag = match rx.await {
Ok(Ok(_)) => action.tag(),
// TODO: Try to return these errors somehow.
Ok(Err(_err)) => EndTransactionAction::Rollback.tag(),
// This case means the persistence runtime is no longer
// running (shut down or crashed)s
Err(_) => EndTransactionAction::Rollback.tag(),
Ok(_) => action.tag(),
// TODO: Try to return this error somehow.
Err(_err) => EndTransactionAction::Rollback.tag(),
};
let result = Ok(ExecuteResponse::TransactionExited { tag, was_implicit });
tx.send(result, session);
Expand All @@ -2227,7 +2231,7 @@ impl Coordinator {
&mut self,
session: &mut Session,
action: &EndTransactionAction,
) -> Result<Option<oneshot::Receiver<Result<(), CoordError>>>, CoordError> {
) -> Result<Option<impl Future<Output = Result<(), CoordError>>>, CoordError> {
let (drop_sinks, txn) = session.clear_transaction();
self.drop_sinks(drop_sinks);

Expand Down Expand Up @@ -2306,17 +2310,18 @@ impl Coordinator {
"internal error: persist_multi_details invariant violated"
)
})?;
let (tx, rx) = oneshot::channel();
let callback = Box::new(move |res: Result<SeqNo, PersistError>| {
let res = res
.map(|_| ())
.map_err(|err| CoordError::Unstructured(anyhow!("{}", err)));
let _ = tx.send(res);
// NB: Keep this method call outside any
// tokio::spawns. We're guaranteed by persist that
// writes and seals happen in order, but only if we
// synchronously wait for the (fast) registration of
// that work to return.
let write_res =
persist_multi.write_handle.write_atomic(persist_updates);
let write_res = write_res.into_future().map(|res| match res {
Ok(_) => Ok(()),
Err(err) => Err(CoordError::Unstructured(anyhow!("{}", err))),
});
persist_multi
.write_handle
.write_atomic(persist_updates, CmdResponse::Callback(callback));
return Ok(Some(rx));
return Ok(Some(write_res));
} else {
for (id, updates) in volatile_updates {
self.broadcast(dataflow::Command::Insert { id, updates });
Expand Down Expand Up @@ -3146,8 +3151,16 @@ impl Coordinator {
})
.collect();
// Persistence of system table inserts is best effort, so throw
// away the response and ignore any errors.
persist.write_handle.write(&updates, CmdResponse::Ignore);
// away the response and ignore any errors. We do, however,
// respect the note below so we don't end up with unexpected
// write and seal reorderings.
//
// NB: Keep this method call outside the tokio::spawn. We're
// guaranteed by persist that writes and seals happen in order,
// but only if we synchronously wait for the (fast) registration
// of that work to return.
let write_res = persist.write_handle.write(&updates);
let _ = tokio::spawn(async move { write_res.into_future().await });
} else {
let updates: Vec<Update> = updates
.into_iter()
Expand Down
3 changes: 3 additions & 0 deletions src/persist/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ abomonation = "0.7"
abomonation_derive = "0.5"
crossbeam-channel = "0.5"
differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow.git" }
futures-util = "0.3.16"
futures-executor = "0.3.16"
log = "0.4.13"
ore = { path = "../ore", default-features = false }
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false, features = ["bincode"] }
tokio = { version = "1.9.0", default-features = false, features = ["macros", "sync"] }

[dev-dependencies]
criterion = "0.3.5"
Expand Down
Loading

0 comments on commit 2dd5445

Please sign in to comment.