-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
logical: Add support for backfilling #190
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 14 of 14 files at r1, all commit messages.
Reviewable status: all files reviewed, 7 unresolved discussions (waiting on @bobvawter and @sravotto)
internal/source/logical/config.go
line 32 at r1 (raw file):
// AllowBackfill enables the use of a non-transaction, // high-throughput mode for backfilling data sources. AllowBackfill bool
Can this not just be a duration instead of a bool?
If this is only available per dialect, then it should probably be pushed down to the dialects themselves.
internal/source/logical/config.go
line 86 at r1 (raw file):
// DefaultConsistentPoint used only by mylogical. f.BoolVar(&c.Immediate, "immediate", false, "apply data without waiting for transaction boundaries") f.IntVar(&c.FanShards, "fanShards", 16,
Why 16? Maybe base this on cpus?
internal/source/logical/factory.go
line 44 at r1 (raw file):
defer f.mu.Unlock() for _, cancel := range f.mu.cancels { cancel()
Would this be faster if the closes were all called at once? Using a waitgroup?
internal/source/logical/fan_events.go
line 29 at r1 (raw file):
config *Config fans *fan.Fans
Ok, what's the difference between, fan, fans and fanevents? Also, why the empty line here?
internal/source/logical/fan_events.go
line 73 at r1 (raw file):
} // OnRollback implements Events and resetes the enclosed fan.
nit resets
internal/source/logical/loop.go
line 224 at r1 (raw file):
// This goroutine applies the incoming mutations to the target // database. It is fragile, when it errors, we need to also
What do you mean by fragile here?
internal/source/logical/serial_events.go
line 86 at r1 (raw file):
func (e *serialEvents) stop() { if e.tx != nil { _ = e.tx.Rollback(context.Background())
Ignoring the error?
290f6c1
to
f22bbe9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL: Revised, with time-based backfilling approach.
Reviewable status: 3 of 25 files reviewed, 6 unresolved discussions (waiting on @BramGruneir and @sravotto)
internal/source/logical/config.go
line 32 at r1 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Can this not just be a duration instead of a bool?
If this is only available per dialect, then it should probably be pushed down to the dialects themselves.
I have pushed a new take on the configuration so that the loop looks at the approximate wall timestamp of latest consistent point (if there is one) to determine if it is sufficiently stale to start up in backfill mode.
internal/source/logical/config.go
line 86 at r1 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Why 16? Maybe base this on cpus?
Strictly speaking, this should be set based on the number of vCPUs in the target cluster. I never ran out of CPU quota for cdc-sink
before tipping over the target cluster.
internal/source/logical/factory.go
line 44 at r1 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Would this be faster if the closes were all called at once? Using a waitgroup?
I added a comment explaining that those are context-cancellation functions, so they return immediately.
internal/source/logical/fan_events.go
line 29 at r1 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Ok, what's the difference between, fan, fans and fanevents? Also, why the empty line here?
Added comments to clarify their use. The top block are set once by the provider code and the bottom block is mutable state. Sometimes, I wish golang had a final
modifier.
internal/source/logical/fan_events.go
line 73 at r1 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
nit resets
Done.
internal/source/logical/loop.go
line 224 at r1 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
What do you mean by fragile here?
I mean to say that is expected to be a regular source of errors outside of our control (e.g. network issues).
internal/source/logical/serial_events.go
line 86 at r1 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Ignoring the error?
Yes. I rarely find that a failure to rollback a transaction is worth doing anything about, since it's usually in cleanup code responding to some other error.
6696851
to
8aa3a33
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 18 of 22 files at r2, 1 of 1 files at r4, 1 of 2 files at r5, 1 of 2 files at r6, 1 of 1 files at r8, all commit messages.
Reviewable status: all files reviewed, 5 unresolved discussions (waiting on @bobvawter and @sravotto)
internal/source/logical/factory.go
line 65 at r8 (raw file):
cfg := f.cfg.Copy() if cfg.LoopName == "" {
how useful is this loop name?
internal/source/logical/loop.go
line 226 at r8 (raw file):
// process once we've caught up. This routine never errors out, and // we don't need to wait for it below. if isBackfilling {
This unbounded loop concerns me. This just seems like a lot of extra work when we can just look at timestamp. You have the strategy below, is that not enough? Are you worried about thrashing?
internal/source/logical/metrics_events.go
line 38 at r8 (raw file):
commitTime = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "logical_last_commit_seconds", Help: "the original time of the most recently applied commit from the source database",
Probably worth adding a "immediate vs transaction" metric.
internal/source/mylogical/config.go
line 111 at r8 (raw file):
processID, err := strconv.ParseInt(c.LoopName, 10, 32) if err != nil { return errors.New("the LoopName must be an integer")
? Are you sure about this? Why are you parsing the loopname into an int?
internal/source/mylogical/consistent_point.go
line 158 at r8 (raw file):
} func (c *consistentPoint) MarshalJSON() ([]byte, error) {
Do you have tests for these?
8f1ef9c
to
76daacd
Compare
This change adds an explicit API for logical dialects which can support faster backfilling of data. The goal is to use fan mode during a backfill and then switch to a fully-transactional mode once the data load has been completed. The backfill mode is gated behind an --backfillWindow flag, to ensure that the operator is aware that the database may not be transactionally-consistent with respect to the source if replication lags by more than the specified window. A memoizing logical.Factory type is added to support a future use case that is best expressed as a collection of independent replication loops that share the target stack. The logical.Backfiller interface is also added to support future work. The loop type no longer implements the loop.Event interface. Instead, we introduce a fan- and a serial-mode implementation, which are chosen based on the mode in which the loop is operating. Wrappers are added to suport chaos and metrics collection. The pglogical and mylogical packages are updated to support the revised API.
76daacd
to
6bd4d10
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 16 of 26 files reviewed, 5 unresolved discussions (waiting on @BramGruneir and @sravotto)
internal/source/logical/factory.go
line 65 at r8 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
how useful is this loop name?
It's used in the Firestore integration. I'm running a loop per (source collection, target table)
pair. Each loop needs a distinct name to store its consistent-point data under. It, arguably, makes the metrics slightly more useful.
internal/source/logical/loop.go
line 226 at r8 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
This unbounded loop concerns me. This just seems like a lot of extra work when we can just look at timestamp. You have the strategy below, is that not enough? Are you worried about thrashing?
I'm not following the comment.
internal/source/logical/metrics_events.go
line 38 at r8 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Probably worth adding a "immediate vs transaction" metric.
Done.
internal/source/mylogical/config.go
line 111 at r8 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
? Are you sure about this? Why are you parsing the loopname into an int?
I don't know why the original version of config was treating the replication ID as a string to begin with. I have fixed the field type and flag.
internal/source/mylogical/consistent_point.go
line 158 at r8 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Do you have tests for these?
They were tested indirectly. There is now a specific check added to consistent_point_test.go
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concern I have over the potential thrashing can be addressed later.
Reviewed 3 of 5 files at r9, 2 of 3 files at r10, 3 of 4 files at r12, 1 of 1 files at r13, 1 of 1 files at r14, all commit messages.
Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @bobvawter, @BramGruneir, and @sravotto)
internal/source/logical/loop.go
line 226 at r8 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
I'm not following the comment.
Sorry, two separate points here. the first, is
"why is this loop required?"
The second is
If you set the backfill limit to 10 mins behind, it is possible to get thrashing (backflll -> transaction -> backfill -> transactions ... etc) if the feed is always straddling that 10 min delay?
internal/source/logical/metrics_events.go
line 38 at r8 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Done.
Thanks!
This change adds an explicit API for logical dialects which can support faster
backfilling of data. The goal is to use fan mode during a backfill and then
switch to a fully-transactional mode once the data load has been completed.
The backfill mode is gated behind an --backfillWindow flag, to ensure that the
operator is aware that the database may not be transactionally-consistent with
respect to the source if replication lags by more than the specified window.
A memoizing logical.Factory type is added to support a future use case that is
best expressed as a collection of independent replication loops that share the
target stack. The logical.Backfiller interface is also added to support future
work.
The loop type no longer implements the loop.Event interface. Instead, we
introduce a fan- and a serial-mode implementation, which are chosen based on
the mode in which the loop is operating. Wrappers are added to suport chaos and
metrics collection.
The pglogical and mylogical packages are updated to support the revised API.
This change is![Reviewable](https://camo.githubusercontent.com/23b05f5fb48215c989e92cc44cf6512512d083132bd3daf689867c8d9d386888/68747470733a2f2f72657669657761626c652e696f2f7265766965775f627574746f6e2e737667)