New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
coord,dataflow: stop returning canceled responses from compute #12179
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So good! Really glad you were able to get a test working. Can we figure out some way to get the CoordPeekResponse
type into the coord
crate though?
src/dataflow-types/src/types.rs
Outdated
panic!("PeekResponse::unwrap_rows called on {:?}", self) | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// Peek responses that the controller can provide back to the coordinator. | ||
#[derive(Clone, Debug, Serialize, Deserialize)] | ||
pub enum CoordPeekResponse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit of an abstraction violation—can this type get hoisted into the coord
crate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've also realized it's just...wrong. The controller never sends a Cancel, so it should still be sending a regular PeekResponse, and only the coord gets to see a CoordPeekResponse. Will move.
Moved CoordPeekResponse to coord. Added a second commit that also moves PeekResponseUnary to coord for the same reason, although that change is just code movement and has zero functional change. |
Let me take a peek before merging if you don't mind! I want to check that the compute layer still does all the things it needs to do wrt collecting up the peeks and such. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR makes some changes that will introduce bad behavior for COMPUTE, in that canceled peeks will e.g. not be removed from ActiveReplication::peeks
, which happens only by way of peek responses returned. It can be changed to happen instead in response to observing peek cancelation commands, but possibly the whole of ComputeCommandHistory
should be updated in that case.
If there is a smaller change to make instead, perhaps just to ADAPTER, which can be followed by COMPUTE work that changes its own behavior, that would be easier to merge.
src/compute/src/compute_state.rs
Outdated
ComputeCommand::CancelPeeks { uuids } => self | ||
.compute_state | ||
.pending_peeks | ||
.retain(|peek| !uuids.contains(&peek.peek.uuid)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this change is not correct, in that sending a peek response of Canceled
is how we communicate back to the surrounding compute infrastructure that the peek has been dealt with. Not doing that will result I believe in the compute controller retaining the peek indefinitely, and the dataflow on which it depends, and redeploying them to all replicas.
Oh, good point! The controller itself is smart enough to remove peeks when you call
But
|
@frankmcsherry, seems like an easy fix: diff --git a/src/dataflow-types/src/client/replicated.rs b/src/dataflow-types/src/client/replicated.rs
index 77f181e24..3c3741f07 100644
--- a/src/dataflow-types/src/client/replicated.rs
+++ b/src/dataflow-types/src/client/replicated.rs
@@ -126,9 +126,17 @@ where
T: timely::progress::Timestamp + differential_dataflow::lattice::Lattice + std::fmt::Debug,
{
async fn send(&mut self, cmd: ComputeCommand<T>) -> Result<(), anyhow::Error> {
- // Register an interest in the peek.
- if let ComputeCommand::Peek(Peek { uuid, .. }) = &cmd {
- self.peeks.insert(*uuid);
+ // Sniff out peek commands to update our peek tracking.
+ match &cmd {
+ ComputeCommand::Peek(Peek { uuid, .. }) => {
+ self.peeks.insert(*uuid);
+ }
+ ComputeCommand::CancelPeeks { uuids } => {
+ for uuid in uuids {
+ self.peeks.remove(uuid);
+ }
+ }
+ _ => (),
}
// Initialize any necessary frontier tracking. Would you still prefer to do this over two PRs? |
Maybe! The actual work to do is to check out the other parts of the result pipeline, uses of |
For example, there is an apparent race now in |
Ah, indeed.
Ack, works for me. I think it'd be ideal to have @mjibson send that PR as a fast follow, since we've all got the context paged in right now. |
If it ends up being easy, great! However, if @mjibson ends up having to invent a new approach to suppressing peek responses that does not introduce a non-droppable tombstone for each canceled peek, we can also punt on that for now! |
PeekResponseUnary was never used or produced by dataflow, so should live in coord.
Change the cancel protocol between coord and controller so that coord no longer waits for cancel peek messages from the controller. Leave the enum variant, allowing dataflow to remove Cancel when it is safe to do so. See MaterializeInc#12179 (review) for more. Fixes MaterializeInc#11507
I removed the dataflow changes and left only the coord change that immediately sends the cancel response to itself. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great, thanks!
Change the cancel protocol between coord and controller so that coord
no longer waits for cancel peek messages from the controller. Remove
the entire enum variant and introduce a new type so it is clear that
the compute layer has no ability to send a cancel response.
Fixes #11507
Motivation
Testing
Release notes
This PR includes the following user-facing behavior changes: