-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kafka: adding best effort and consistent mode. #843
Conversation
5e0e2ba
to
4f58e6b
Compare
Codecov ReportAttention: Patch coverage is
❗ Your organization needs to install the Codecov GitHub app to enable full functionality. Additional details and impacted files@@ Coverage Diff @@
## master #843 +/- ##
==========================================
- Coverage 75.91% 75.78% -0.13%
==========================================
Files 226 226
Lines 10938 10962 +24
==========================================
+ Hits 8304 8308 +4
- Misses 1902 1917 +15
- Partials 732 737 +5 ☔ View full report in Codecov by Sentry. |
4f58e6b
to
294e383
Compare
c6c4efb
to
de7bf09
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 6 files at r3, 13 of 13 files at r6, all commit messages.
Reviewable status: all files reviewed, 6 unresolved discussions (waiting on @BramGruneir and @sravotto)
internal/source/kafka/consumer.go
line 47 at r6 (raw file):
schema ident.Schema // The target schema. timeRange hlc.Range // The time range for incoming mutations. fromState []*partitionState // The initial offsets for each partitions.
Sort order.
internal/source/kafka/consumer.go
line 69 at r6 (raw file):
session.MarkOffset(marker.topic, marker.partition, marker.offset, "start") } c.mu.done = make(map[string]bool)
Not knowing how Setup
is called, and from which goroutine(s), you should go ahead and lock the mutex to avoid any surprises later on.
internal/source/kafka/consumer.go
line 110 at r6 (raw file):
payload, err := c.accumulate(toProcess, message) if err != nil { log.Error(err)
Use log.WithError(err).Error("message")
Otherwise, the error object isn't going to be presented to the logging hooks as an error
object.
internal/source/kafka/injector.go
line 27 at r6 (raw file):
"github.com/cockroachdb/replicator/internal/conveyor" scriptRuntime "github.com/cockroachdb/replicator/internal/script" "github.com/cockroachdb/replicator/internal/sequencer/retire"
Did you run crlfmt
? I think you'd get a code-quality warning if the imports aren't sorted.
go run github.com/cockroachdb/crlfmt -w -ignore _gen.go .
internal/source/kafka/injector.go
line 53 at r6 (raw file):
tgt.Set, retire.Set, conveyor.Set,
Alphabetize.
internal/source/kafka/provider.go
line 30 at r6 (raw file):
ProvideConn, ProvideEagerConfig, ProvideConveyorConfig,
Sort.
aadec92
to
c569c3a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 10 of 13 files reviewed, 6 unresolved discussions (waiting on @bobvawter and @BramGruneir)
internal/source/kafka/consumer.go
line 47 at r6 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Sort order.
Done.
internal/source/kafka/consumer.go
line 69 at r6 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Not knowing how
Setup
is called, and from which goroutine(s), you should go ahead and lock the mutex to avoid any surprises later on.
Done.
internal/source/kafka/consumer.go
line 110 at r6 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Use
log.WithError(err).Error("message")
Otherwise, the error object isn't going to be presented to the logging hooks as anerror
object.
Done.
internal/source/kafka/injector.go
line 27 at r6 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Did you run
crlfmt
? I think you'd get a code-quality warning if the imports aren't sorted.
go run github.com/cockroachdb/crlfmt -w -ignore _gen.go .
it seems to me that they are properly sorted.
internal/source/kafka/injector.go
line 53 at r6 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Alphabetize.
Done.
internal/source/kafka/provider.go
line 30 at r6 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Sort.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 3 of 3 files at r7, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @BramGruneir and @sravotto)
internal/source/kafka/conn_test.go
line 40 at r7 (raw file):
done bool timestamps ident.Map[hlc.Time] ensure ident.Map[bool]
Sort.
This change adds transactional support to the Kafka connector for changefeeds. This change uses the conveyor package to deliver mutation to the target database in any of the supported modes of operations. The conveyor tracks resolved timestamps across all the partitions with a topic, and ensures that the checkpoint used by cdc-sink is advanced based on the minimal resolved timestamp received on all the partitions.
This change refactors the Kafka configuration, moving all the parameters required by SASL into a substructure.
c569c3a
to
57fd274
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Reviewable status: 12 of 13 files reviewed, 1 unresolved discussion (waiting on @bobvawter and @BramGruneir)
internal/source/kafka/conn_test.go
line 40 at r7 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Sort.
Done.
This change adds transactional support to the Kafka connector for changefeeds.
This change uses the conveyor package to deliver mutation to the target database in any of the supported modes of operations.
The conveyor tracks resolved timestamps across all the partitions with a topic, and ensures that the checkpoint used by cdc-sink is advanced based on the minimal resolved timestamp received on all the partitions.
A second commit, refactors the configuration for better readability.
This change is