coord: Parallelize builtin table writes with side effects#23551
coord: Parallelize builtin table writes with side effects#23551ParkMyCar merged 6 commits intoMaterializeInc:mainfrom
Conversation
a0d9c6f to
790f656
Compare
src/adapter/src/coord/appends.rs
Outdated
| // Note: while technically we should have `GroupCommitApply` update the notifies, we | ||
| // know that internal commands get processed before any user commands, so the writes | ||
| // are still guaranteed to be observable before any user commands. | ||
| for notify in notifies { | ||
| // We don't care if the listeners have gone away. | ||
| let _ = notify.send(()); | ||
| } | ||
|
|
||
| // Trigger a GroupCommitApply, which will run before any user commands since we're | ||
| // sending it on the internal command sender. |
There was a problem hiding this comment.
This scares me a bit. It's not obviously correct and seems like something that's easy to accidentally break without realizing it. Why not just send the replies after we apply the timestamp?
There was a problem hiding this comment.
We can't send the notifies after we apply the timestamp because that requires waiting for the Coordinator to run the queued GroupCommitApply command, which it won't be able to do because the current command is waiting for the notify to resolve, so we end up in a dead lock.
Joe and I talked in a Huddle, the correctness property here relies on internal commands being run before user commands. We queue a GroupCommitApply on the internal command sender before notifying any waiters, so we know the write will be applied before any user commands could observe the intermediate state.
We don't love relying on this property though, and conveniently some PlatformV2 work will soon make it possible to give an owned handle to the timestmap oracle, to this spawned task. Then we could explicitly apply the write in the task instead of relying on the queued command to get processed before user commands. Waiting to hear back on how soon the shareable timestamp oracle will be ready!
src/adapter/src/coord/appends.rs
Outdated
|
|
||
| /// Submit a write to a system table. | ||
| /// | ||
| /// This method will block the Coordinator on doing some initial work, and then returns a |
There was a problem hiding this comment.
on doing some initial work
This seems a bit vague, what exactly does it wait for?
There was a problem hiding this comment.
Updated this comment to be more descriptive
src/adapter/src/coord/ddl.rs
Outdated
| }) | ||
| }) | ||
| .await; | ||
| .await?; |
There was a problem hiding this comment.
Why did we add a ?? Doesn't this always return an error making the rest of this function un-reachable?
There was a problem hiding this comment.
Didn't mean to do this, was a cargo culting issue. Fortunately our tests caught the bug! I updated this and made the assertions stronger here.
|
The test failures look legitimate. I'll wait until it's green to run nightly/coverage. |
790f656 to
f72a513
Compare
|
EDIT: Oops I commented this on the wrong PR, please ignore. |
254394b to
db89ed7
Compare
src/adapter/src/coord/appends.rs
Outdated
| .metrics | ||
| .append_table_duration_seconds | ||
| .with_label_values(&[label]); | ||
| .with_label_values(&["false"]); |
There was a problem hiding this comment.
Just realizing, is always using false here correct? Whether we block or not is determined by the caller so we don't really know if we're going to block here. Should we just remove the labels from this Histogram?
There was a problem hiding this comment.
You're totally right! Removing this label would be best, I'm not sure if we can do that though/if it's forward compatible, let me check
| // Run our side effects concurrently with the table updates. | ||
| let ((), ()) = futures::future::join(side_effects_fut, table_updates).await; |
There was a problem hiding this comment.
For my own Rust learnings, how does side_effects_fut and table_updates run concurrently if they both have a mutable reference to the Coordinator?
There was a problem hiding this comment.
Or am I wrong about them both needing a mutable reference?
There was a problem hiding this comment.
table_updates does not actually have a mutable reference to the Coordinator, which is how they're able to both run concurrently
src/adapter/src/coord/appends.rs
Outdated
| // Note: while technically we should have `GroupCommitApply` update the notifies, we | ||
| // know that internal commands get processed before any user commands, so the writes | ||
| // are still guaranteed to be observable before any user commands, because we | ||
| // submitted the GroupCommitApply above. |
There was a problem hiding this comment.
Should we get rid of this commit now or combine it with the one above on line 394?
There was a problem hiding this comment.
Sorry I'm a bit confused, what do you mean by this?
There was a problem hiding this comment.
I might be misreading, but it seems like the following two comments are trying to say the same thing:
// Trigger a GroupCommitApply, which will run before any user commands since we're
// sending it on the internal command sender.
// Note: while technically we should have
GroupCommitApplyupdate the notifies, we
// know that internal commands get processed before any user commands, so the writes
// are still guaranteed to be observable before any user commands, because we
// submitted the GroupCommitApply above.
So it might be a bit easier to maintain if we combined them into a single comment. That's subjective though, so feel free to ignore.
There was a problem hiding this comment.
Ah ha! Updated the comment and combined the two
| let cloud_resource_controller = self | ||
| .cloud_resource_controller | ||
| .as_ref() | ||
| .cloned() | ||
| .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))?; |
There was a problem hiding this comment.
Where did this come from? Or am I missing something obvious?
There was a problem hiding this comment.
This was split out from an error case which used to occur after the catalog transaction. But I moved this up so we'd error before the transaction. Turns out this made a bunch of tests fail, so I refactored it again to just emit warnings which is what was suggested in Slack
db89ed7 to
79845a0
Compare
…rror after creating a connection
79845a0 to
fa01c77
Compare
MitigationsCompleting required mitigations increases Resilience Coverage.
Bug Hotspots:
|
e308e8b to
6dff1ed
Compare
6dff1ed to
7bf7a40
Compare
This PR improves the latency of creation DDL, e.g.
CREATE TABLE, by waiting for builtin table updates to complete concurrently with any side effects, e.g. creating a storage collection.Part of this change which might be particularly controversial, is refactoring the handling of group commits, and removing the notion of blocking. Now it's up to the caller as to whether or not they want to block on a group commit finishing, by
.await-ing a returnedFuture. This change makes it possible to wait on builtin table updates concurrently with other tasks. Specifically we no longer callself.group_commit_apply(...)directly ingroup_commit_initiate(...)instead we queue aGroupCommitApplytask on the internal command sender. I believe this is okay to do because all internal commands are handled before user commands, so we'll still process the group commit apply before users could observe any intermediate state, e.g. a table being created but not showing up inmz_tables. Briefly I was concerned about the "late" GroupCommitApply message moving the write timestamp backwards, but this invariant is enforced in the implementations of the timestamp oracle.Open Telemetry Trace
The above is the result of running
CREATE TABLE t1 (x int, y text, z timestamp)on staging with this built deployed. You can see that "group_commit_initiate" (builtin table updates) and "insert_without_overwrite" (a side effect), get run in parallel.Motivation
Improves https://github.com/MaterializeInc/database-issues/issues/7078
Tips for reviewer
Hiding white space is helpful since a number of things either got indented or de-dented a level.
This PR is split into two commits which can be reviewed independently:
Checklist
$T ⇔ Proto$Tmapping (possibly in a backwards-incompatible way), then it is tagged with aT-protolabel.