New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
coord,storage: reduce runtime of Coordinator::advance_local_inputs()
#12813
coord,storage: reduce runtime of Coordinator::advance_local_inputs()
#12813
Conversation
Some(updates) => updates, | ||
None => continue, | ||
}; | ||
|
||
for update in &updates { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check should move above before the batches are merged otherwise we can turn a pair of batches of which the first one is invalid and the second one is valid into a big valid one if the updates of the first are beyond the first batch's upper but not beyond the second batch's upper.
let (existing_updates, _current_upper, new_upper) = updates_by_id | ||
.entry(id) | ||
.or_insert_with(|| (Vec::new(), current_upper, T::minimum())); | ||
existing_updates.append(&mut updates); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of making a big compound array that contains all the data we can instead produce an iterator that will go through all the pieces.
diff --git a/src/dataflow-types/src/client/controller/storage.rs b/src/dataflow-types/src/client/controller/storage.rs
index 0b5d92809..9063f6065 100644
--- a/src/dataflow-types/src/client/controller/storage.rs
+++ b/src/dataflow-types/src/client/controller/storage.rs
@@ -410,15 +410,26 @@ where
) -> Result<(), StorageError> {
let mut updates_by_id = HashMap::new();
- for (id, mut updates, batch_upper) in commands {
- let current_upper = self.collection(id)?.write_frontier.frontier().to_owned();
- let (existing_updates, _current_upper, new_upper) = updates_by_id
+ for (id, updates, batch_upper) in commands {
+ for update in &updates {
+ if !update.timestamp.less_than(&batch_upper) {
+ return Err(StorageError::UpdateBeyondUpper(id));
+ }
+ }
+
+ let (total_updates, new_upper) = updates_by_id
.entry(id)
- .or_insert_with(|| (Vec::new(), current_upper, T::minimum()));
- existing_updates.append(&mut updates);
+ .or_insert_with(|| (Vec::new(), T::minimum()));
+ total_updates.push(updates);
new_upper.join_assign(&batch_upper);
}
+ let mut appends_by_id = HashMap::new();
+ for (id, (updates, upper)) in updates_by_id {
+ let current_upper = self.collection(id)?.write_frontier.frontier().to_owned();
+ appends_by_id.insert(id, (updates.into_iter().flatten(), current_upper, upper));
+ }
+
let futs = FuturesUnordered::new();
// We cannot iterate through the updates and then set off a persist call
@@ -429,17 +440,11 @@ where
// through all available write handles and see if there are any updates
// for it. If yes, we send them all in one go.
for (id, persist_handle) in self.state.persist_handles.iter_mut() {
- let (updates, upper, new_upper) = match updates_by_id.remove(id) {
+ let (updates, upper, new_upper) = match appends_by_id.remove(id) {
Some(updates) => updates,
None => continue,
};
- for update in &updates {
- if !update.timestamp.less_than(&new_upper) {
- return Err(StorageError::UpdateBeyondUpper(*id));
- }
- }
-
let new_upper = Antichain::from_elem(new_upper);
let updates = updates
let change_batches = futs | ||
.collect::<Vec<_>>() | ||
.await | ||
.into_iter() | ||
.collect::<Result<Vec<_>, _>>()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you probably want to use .try_collect()
here https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html#method.try_collect
let change_batches = futs | |
.collect::<Vec<_>>() | |
.await | |
.into_iter() | |
.collect::<Result<Vec<_>, _>>()?; | |
let change_batches = futs.try_collect::<Vec<_>>().await?; |
thanks for the suggestions, @petrosagg! 😊 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The storage changes look good! I'm deferring to Dan for the persist ones
I do think that, given downgrade_since is idempotent, it should be possible for storage to be able to fire off each downgrade_since call in a task and not worry about it again, but it would take some persist work (some cloning and maybe a mutex). it's also not clear if the technique would generalize to empty compare_and_append frontier updates. if petros is happy with this complexity in storage, then I'm also fine with it and potentially circling back later does this mean I should press on #12482?
are these number correct? (300ms down to 300ms) |
Yes! 😅 without your (@danhhz's) PR, my changes do remove the serial nature of persist calls in the storage controller, but then postgres consensus serializes things because the pg client is behind a mutex. |
I don't think it's an issue for the M1 demo because I don't think we want to create many objects. The numbers with 100 tables are in the PR description, but I also got some smaller scale numbers: runtime of
The last set of numbers is this PR. The one with |
b36b393
to
d95100b
Compare
…te_read_capabilities
Before, we would set off each `compare_and_append()` call and individually await each future. Now we collect the futures of all calls in a `FuturesUnordered` and await them concurrently.
d95100b
to
930310b
Compare
TFTR! |
} | ||
|
||
let change_batches = futs.try_collect::<Vec<_>>().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can anything bad happen if one of these futures is cancelled in the middle of an await? for example, one could be in the middle of compare_and_append
, but another finishes with an error, and the first is dropped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
glad you're thinking about this! we don't do anything to test it yet, but persist intends to be cancel-safe 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be fine, at least I don't think it makes things worse. Before, a compare_and_append
could be cancelled right in the middle of sth if the future that is returned from append()
is cancelled.
Updated (and less controversial) version of #12777
Motivation
That method, which is invoked at least every
timestamp_interval
is blocking the main coordinator task, and therefore should run as quickly as possible.This two multiple mitigations that work towards reducing the runtime of
advance_local_inputs()
: we parallelizecompare_and_append()
calls inStorageController::append()
and we parallelizedowngrade_since()
calls inStorageController::update_read_capabilities()
.Tips for reviewer
The first commit adds duration logging, which allows us to look into things. The rest of the individual commits have comments in the code that outline why we do things the way we do them.
I used
to understand the baseline performance and to gauge the impact of the two main changes. I did reduce
COUNT
to100
intests/limits.mzcompose.py
, though, in order to not have to wait too long.Impact of mitigations (runtime numbers on my linux machine):
advance_local_inputs()
is about 300ms.The threadpool commit is from #12482 and should not be merged along with these changes. It's only in here to get a feel for its impact. We should definitely merge that PR as well, though.
Testing