-
Notifications
You must be signed in to change notification settings - Fork 459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pg sources: demux subsource errors #18539
pg sources: demux subsource errors #18539
Conversation
85c20f3
to
79fd9d7
Compare
@petrosagg Ready for review; will fixup failing doctests in next push |
Do you mind if I test this tomorrow? Thank you. |
@philip-stoev no problem |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for the delay and thank you for refactoring the tests. I think they cover all the scenarios that we are relevant.
src/storage/src/source/types.rs
Outdated
/// | ||
/// This should be 0 to output an error for the primary source; however, you will need to | ||
/// determine a numbering scheme to output errors for subsources. | ||
pub output: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to go the other way with where the output index lives. The only reason output
is a member of SourceMessage
was because this work of error demuxing had not been done. Now that it is done we should instead have the SourceRender::render
function return a collection with this type Collection<G, (usize, Result<SourceMessage<Self::Key, Self::Value>, SourceReaderError>), Diff>,
. Similarly for the health status updates where we should return a Stream<G, (usize, HealthStatusUpdate)>
.
This makes partitioning to N separate streams trivial and also removes the concept of multiple outputs from these structs which don't really care about this concept of outputs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
6248513
to
26bda01
Compare
@petrosagg From our discussions:
I left this work in new commits rather than aggressively pushing them down to simplify review, but if there's anything to go back through CI for I'll tidy things up. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Thanks for pushing this through. I left a comment that I think we should address so that we don't accidentally release something that puts more CRDB pressure but it should be an easy change
src/storage/src/render/mod.rs
Outdated
source_data, | ||
storage_state, | ||
metrics, | ||
) | ||
}; | ||
tokens.push(token); | ||
|
||
let health_token = crate::source::health_operator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can delay rendering the health operator until after this for loop. Here you can just collect all the health streams in a vector:
health_streams.push(health);
And then render a single health operator with the combined stream:
use timely::dataflow::operators::Concatenate;
if let Some(health) = health_streams.pop() {
let combined_health = health.concatenate(health_streams);
// A single operator for all the data
let health_token = crate::source::health_operator(..., combined_health);
}
This is desirable since each health operator competes with every other one to compare and append to the shard so it's beneficial to have as few of them as possible. In fact in a future refactor we should make it a single one per clusterd
but for now we should at least keep their number the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ty for this pointer; this actually means we don't need to partition out the stream at all and can just pass it through and then "demux" it in the operator itself by just keeping the output_index
intact. Makes the operator more complex because it maintains state for each output but I don't think it's too bad. Am going to merge this if it's green and can refactor later if it's too ugly to bear.
4cc5a05
to
3912047
Compare
in preparation for creating health status shards for subsources, we need the source export information.
- Propagate primary source health status to subsources - Log warnings on subsource errors
It looks like toxiproxy gets us to issue SuspendAndRestart commands based on the health reporting for sources. Now that subsources report their own health, we might issue this command for them. Add a check that simply prevents this from causing a panic, even though there is no actual work to be done here because subsource's tokens belong to their primary source.
b14941b
to
b3e79bc
Compare
Prior commits demultiplexed the health operator by means of partitioning the stream. Ultimately, we only want to create a single health operator per source, so instead only logically demultiplex the streams output in the operator itself.
b3e79bc
to
820391f
Compare
Previously, an error on any subsource produced an error that prevented all subsources from being used. We can instead de-multiplex subsource errors from the replication stream into n error streams (just as we do for the ok streams). This is a huge UX win as it means one mistake doesn't wedge the entire source.
Additionally, we can include the health of the subsources in the
mz_internal.mz_source_status_history
table.Motivation
This PR adds a known-desirable feature. Closes #17490
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way) and therefore is tagged with aT-proto
label.