New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move PostgresSourceReader
to SourceReader
over SimpleSource
#11946
Conversation
79f16a5
to
efaa025
Compare
PostgresSourceReader
to SourceReader
over SimpleSource
2905cd2
to
174c8c6
Compare
d144230
to
2eeb57b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gave a quick look-through (didn't get to the actual logic of the postgres source reader impl so it's possible that some suggestions are redundant. In that case sorry!)
@@ -478,9 +527,15 @@ pub struct SourceMessage<Key, Value> { | |||
/// Headers, if the source is configured to pass them along. If it is, but there are none, it | |||
/// passes `Some([])` | |||
pub headers: Option<Vec<(String, Option<Vec<u8>>)>>, | |||
|
|||
/// Allow sources to optionally output a specific differential | |||
/// `diff` value. Defaults to `+1`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious why you've structured it as an arbitrary Diff
type rather than either:
- Option<mz_repr::Diff>
mz_repr::Diff
and having everything other than postgres set it to 1 (since they can authoritatively say that)
I don't follow (perhaps out of ignorance!) when we'd need to support a non-mz_repr::Diff
in this field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had it Option<mz_repr::Diff>
or something similar but I found that it made it unclear in render_source
what was happening. with the associated type we can ensure that ONLY
SourceType::Row
ends up having type Diff = Diff
, where as other types always have type Diff = ()
.
in the future this also enables us to generic-ize DecodeResult
and cleanup render_source
even more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so in short: yes, we want other diff types, namely ()
to clearly communicate when a source wants to use the default behavior, as opposed to all those sources needing to implement that default themselves
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with the associated type we can ensure that ONLY SourceType::Row ends up having type Diff = Diff, where as other types always have type Diff = ()
sorry -- I don't think I follow what value this provides
all those sources needing to implement that default themselves
but the default is just putting a 1
instead of a ()
yeah? And currently it's a ()
that somewhere else is mapped to a 1
?
I think my concern is that it seems like extra complexity "what is a specific_diff
as opposed to just a regular diff
?" that doesn't practically buy us much -- but I am happy to say that it's possible I'm just not following the benefits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with a fixed type, either diff: i64
or diff: Option<i64>
, the SourceReader
impl is required to provide the "default" to us. If kafka's SourceReader
gives us 2
over None
or 1
, we can't really tell if thats wrong or not, we have to assume its right
With this, we declare, at the top of the trait impl, that the kafka source reader can produce only 1 value as a diff (in this case ()
, but we could make an empty struct call Default
or whatever, preventing any complexity or thought in the decision when producing the SourceMessage
inside the next
method.
I found this to be much cleaner and easier to reason about, and less fragile. You are right that it doesn't buy us much, but I am inclined to encode invariants in the type system as much as possible, without making stuff unreadable. I found this to be more readable than the previous version (you may be able to find a commit on this pr from earlier) that didn't do it this way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If kafka's SourceReader gives us 2 over None or 1, we can't really tell if thats wrong or not, we have to assume its right
Isn't KafkaSourceReader
the authoritative source for what is correct for a kakfa source though? Just like the PostgresSourceReader
is the authoritative source for the diff field for a postgres source. We're not supposing what we should protect against postgres being wrong. I think that's the thing I'm not understanding.
Hmm I guess this setup is analogous to parametrizing over a trait implemented by two enums: enum AppendOnly { Append }
and enum AppendOrDelete { Append, Delete }
-- which I do very clearly see the value in soperhaps it's the naming tripping me up? I think writing those enums out is more explicit (and maybe even more clear but I was the one who came up with it so grain of salt...) but if you don't want to do that, spitballing here: what about just renaming specific_diff -> update_type
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cjubb39 KafkaSourceReader
isnt really the authoritative source: it can be composed with envelopes that may or may not support non-1
diffs; so in some sense, the caller in render::sources::render_source
is authoritative (as it should be, it is the place where SourceReaders
are correctly instantiated
we can definitely change the name! update_type
is fine, what about type MessageDiff
and message_diff
?
@@ -478,9 +527,15 @@ pub struct SourceMessage<Key, Value> { | |||
/// Headers, if the source is configured to pass them along. If it is, but there are none, it | |||
/// passes `Some([])` | |||
pub headers: Option<Vec<(String, Option<Vec<u8>>)>>, | |||
|
|||
/// Allow sources to optionally output a specific differential | |||
/// `diff` value. Defaults to `+1`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with the associated type we can ensure that ONLY SourceType::Row ends up having type Diff = Diff, where as other types always have type Diff = ()
sorry -- I don't think I follow what value this provides
all those sources needing to implement that default themselves
but the default is just putting a 1
instead of a ()
yeah? And currently it's a ()
that somewhere else is mapped to a 1
?
I think my concern is that it seems like extra complexity "what is a specific_diff
as opposed to just a regular diff
?" that doesn't practically buy us much -- but I am happy to say that it's possible I'm just not following the benefits
b6da316
to
8624835
Compare
@cjubb39 I believe I resolved all your comments! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please update the comment on the MzOffset impl below before merging! will leave the other one up to you
src/dataflow-types/src/types.rs
Outdated
@@ -1483,6 +1483,16 @@ pub mod sources { | |||
} | |||
} | |||
|
|||
/// Convert from KafkaOffset to MzOffset (1-indexed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update comment kafka -> pg
Thanks for doing this change! Happy to moving to have one impl for sources! |
Ultimately, I think we should not have multiple
Source
traits. This is the first such change for this: switchingPostgres
off of aSimpleSource
to a fullSourceReader
. Note that this gives us timestamp bindings for pg for free!This pr has several subtle parts:
SourceReader
assumes that a message at some offset the last such message. Pg has transactions that happen at the same timestamp, so we need to support holding back a cursor to support this. This is quite subtle, and @petrosagg may be interested in looking closely1
) values. This is to support both replication failure (causing a revert), and pg deletes.render_source
readable, forces adiff
in the value inDecodeResult
.unreachable!
for not-possiblediff
values in envelopes that don't support esoteric diffs (namely, things other thanNone
. Its not 100% clear to me ifKeyEnvelope::None
is the only supportedSourceEnvelope::None
, but the coden as-written does not deal withSourceReader
. As much code as possible is preserved, and I opted for theget_next_message
route overnext
, as we can transform all sources at the same timeMotivation
#11899
Tips for reviewer
I recommend reviewing the commits in this pr separately.
Testing
Release notes
None