diff --git a/internal/source/kafka/config.go b/internal/source/kafka/config.go index f7c8e489..9712f6ca 100644 --- a/internal/source/kafka/config.go +++ b/internal/source/kafka/config.go @@ -36,6 +36,10 @@ import ( "golang.org/x/oauth2/clientcredentials" ) +const ( + defaultBackfillWindow = time.Hour +) + // EagerConfig is a hack to get Wire to move userscript evaluation to // the beginning of the injector. This allows CLI flags to be set by the // script. @@ -53,15 +57,18 @@ type Config struct { TLS secure.Config BatchSize int // How many messages to accumulate before committing to the target + BestEffortWindow time.Duration // Controls switching between BestEffort and Consistent mode + BestEffortOnly bool // Force the use of BestEffort mode. Brokers []string // The address of the Kafka brokers Group string // the Kafka consumer group id. + Immediate bool // Write directly to staging tables. MaxTimestamp string // Only accept messages at or older than this timestamp MinTimestamp string // Only accept messages at or newer than this timestamp ResolvedInterval time.Duration // Minimal duration between resolved timestamps. Strategy string // Kafka consumer group re-balance strategy Topics []string // The list of topics that the consumer should use. - // SASL + // SASL parameters saslClientID string saslClientSecret string saslGrantType string @@ -73,11 +80,10 @@ type Config struct { // The following are computed. - // Timestamp range, computed based on minTimestamp and maxTimestamp. - timeRange hlc.Range - // The kafka connector configuration. saramaConfig *sarama.Config + // Timestamp range, computed based on minTimestamp and maxTimestamp. + timeRange hlc.Range } // Bind adds flags to the set. It delegates to the embedded Config.Bind. @@ -111,8 +117,17 @@ Please see the CREATE CHANGEFEED documentation for details. `) f.StringVar(&c.Strategy, "strategy", "sticky", "Kafka consumer group re-balance strategy") f.StringArrayVar(&c.Topics, "topic", nil, "the topic(s) that the consumer should use") + // Operation Modes + f.DurationVar(&c.BestEffortWindow, "bestEffortWindow", defaultBackfillWindow, + "use an eventually-consistent mode for initial backfill or when replication "+ + "is behind; 0 to disable") + f.BoolVar(&c.BestEffortOnly, "bestEffortOnly", false, + "eventually-consistent mode; useful for high throughput, skew-tolerant schemas with FKs") + f.BoolVar(&c.Immediate, "immediate", false, + "bypass staging tables and write directly to target; "+ + "recommended only for KV-style workloads with no FKs") - // SASL + // SASL configuration f.StringVar(&c.saslClientID, "saslClientId", "", "client ID for OAuth authentication from a third-party provider") f.StringVar(&c.saslClientSecret, "saslClientSecret", "", "Client secret for OAuth authentication from a third-party provider") f.StringVar(&c.saslGrantType, "saslGrantType", "", "Override the default OAuth client credentials grant type for other implementations") diff --git a/internal/source/kafka/conn.go b/internal/source/kafka/conn.go index 5c6febd4..1f8386dd 100644 --- a/internal/source/kafka/conn.go +++ b/internal/source/kafka/conn.go @@ -20,10 +20,12 @@ import ( "time" "github.com/IBM/sarama" + "github.com/cockroachdb/cdc-sink/internal/sequencer/retire" "github.com/cockroachdb/cdc-sink/internal/sequencer/switcher" + "github.com/cockroachdb/cdc-sink/internal/staging/checkpoint" + "github.com/cockroachdb/cdc-sink/internal/target/apply" "github.com/cockroachdb/cdc-sink/internal/types" "github.com/cockroachdb/cdc-sink/internal/util/hlc" - "github.com/cockroachdb/cdc-sink/internal/util/notify" "github.com/cockroachdb/cdc-sink/internal/util/stopper" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -32,28 +34,26 @@ import ( // Conn encapsulates all wire-connection behavior. It is // responsible for receiving replication messages and replying with // status updates. -// TODO (silvano) : support transactional mode -// https://github.com/cockroachdb/cdc-sink/issues/777 -// note: we get resolved timestamps on all the partitions, -// so we should be able to leverage that. -// // TODO (silvano): support Avro format, schema registry. // https://github.com/cockroachdb/cdc-sink/issues/776 -// -// TODO (silvano): add metrics. -// https://github.com/cockroachdb/cdc-sink/issues/778 + type Conn struct { - // The destination for writes. - acceptor types.MultiAcceptor + // Factory for checkpoint management + checkpoints *checkpoint.Checkpoints // The connector configuration. config *Config - // The group id used when connecting to the broker. group sarama.ConsumerGroup // The handler that processes the events. handler sarama.ConsumerGroupHandler - // The switcher mode - mode *notify.Var[switcher.Mode] + // Utility to remove old mutations + retire *retire.Retire + // Access to the staging database. + stagingDB *types.StagingPool + // Manage the operational mode. + switcher *switcher.Switcher + // Writes mutation to the target database. + tableAcceptor *apply.Acceptor // Access to the target database. targetDB *types.TargetPool // Access to the target schema. @@ -69,7 +69,7 @@ type offsetRange struct { // from the given topics. // If more that one processes is started, the partitions within the topics // are allocated to each process based on the chosen rebalance strategy. -func (c *Conn) Start(ctx *stopper.Context) (err error) { +func (c *Conn) Start(ctx *stopper.Context, target Target) (err error) { var start []*partitionState if c.config.MinTimestamp != "" { start, err = c.getOffsets(c.config.timeRange.Min()) @@ -81,10 +81,11 @@ func (c *Conn) Start(ctx *stopper.Context) (err error) { if err != nil { return errors.WithStack(err) } + c.handler = &Handler{ - acceptor: c.acceptor, + target: target, batchSize: c.config.BatchSize, - target: c.config.TargetSchema, + schema: c.config.TargetSchema, watchers: c.watchers, timeRange: c.config.timeRange, fromState: start, diff --git a/internal/source/kafka/conn_test.go b/internal/source/kafka/conn_test.go index 76174d98..56efdd9b 100644 --- a/internal/source/kafka/conn_test.go +++ b/internal/source/kafka/conn_test.go @@ -25,54 +25,73 @@ import ( "github.com/IBM/sarama" "github.com/cockroachdb/cdc-sink/internal/types" "github.com/cockroachdb/cdc-sink/internal/util/hlc" + "github.com/cockroachdb/cdc-sink/internal/util/ident" "github.com/cockroachdb/cdc-sink/internal/util/stopper" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -var _ types.MultiAcceptor = &mockAcceptor{} - -type mockAcceptor struct { +type mockTarget struct { mu struct { sync.Mutex - done bool + done bool + timestamps map[ident.Ident]hlc.Time + ensure map[ident.Ident]bool } } -func (a *mockAcceptor) done() bool { - a.mu.Lock() - defer a.mu.Unlock() - return a.mu.done -} +var _ Target = &mockTarget{} +var sentinel = hlc.New(9999, 0) -// AcceptMultiBatch implements types.MultiAcceptor. -func (a *mockAcceptor) AcceptMultiBatch( +// AcceptMultiBatch implements Target. +func (a *mockTarget) AcceptMultiBatch( _ context.Context, batch *types.MultiBatch, _ *types.AcceptOptions, ) error { a.mu.Lock() defer a.mu.Unlock() - if batch.ByTime[hlc.New(9999, 0)] != nil { + if batch.ByTime[sentinel] != nil { a.mu.done = true log.Info("AcceptMultiBatch found sentinel") } return nil } -// AcceptTableBatch implements types.MultiAcceptor. -func (a *mockAcceptor) AcceptTableBatch( - context.Context, *types.TableBatch, *types.AcceptOptions, -) error { +// Advance implements Target. +func (a *mockTarget) Advance(_ context.Context, partition ident.Ident, ts hlc.Time) error { + a.mu.Lock() + defer a.mu.Unlock() + a.mu.timestamps[partition] = ts return nil } -// AcceptTemporalBatch implements types.MultiAcceptor. -func (a *mockAcceptor) AcceptTemporalBatch( - context.Context, *types.TemporalBatch, *types.AcceptOptions, -) error { +// Ensure implements Target. +func (a *mockTarget) Ensure(_ context.Context, partitions []ident.Ident) error { + a.mu.Lock() + defer a.mu.Unlock() + for _, p := range partitions { + a.mu.ensure[p] = true + } return nil } +func (a *mockTarget) done() bool { + a.mu.Lock() + defer a.mu.Unlock() + return a.mu.done +} + +func (a *mockTarget) ensureCalled() map[ident.Ident]bool { + a.mu.Lock() + defer a.mu.Unlock() + return a.mu.ensure +} +func (a *mockTarget) timestamps() map[ident.Ident]hlc.Time { + a.mu.Lock() + defer a.mu.Unlock() + return a.mu.timestamps +} + // TestConn verifies that we can process messages using a simple mock broker. func TestConn(t *testing.T) { ctx := stopper.Background() @@ -86,10 +105,10 @@ func TestConn(t *testing.T) { sarama.NewMockSaslAuthenticateResponse(t)), "MetadataRequest": sarama.NewMockMetadataResponse(t). SetBroker(mb.Addr(), mb.BrokerID()). - SetLeader("my-topic", 0, mb.BrokerID()), + SetLeader("my-topic", 31, mb.BrokerID()), "OffsetRequest": sarama.NewMockOffsetResponse(t). - SetOffset("my-topic", 0, sarama.OffsetOldest, 0). - SetOffset("my-topic", 0, sarama.OffsetNewest, 1), + SetOffset("my-topic", 31, sarama.OffsetOldest, 0). + SetOffset("my-topic", 31, sarama.OffsetNewest, 1), "FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t). SetCoordinator(sarama.CoordinatorGroup, "my-group", mb), "HeartbeatRequest": sarama.NewMockHeartbeatResponse(t), @@ -103,24 +122,24 @@ func TestConn(t *testing.T) { &sarama.ConsumerGroupMemberAssignment{ Version: 0, Topics: map[string][]int32{ - "my-topic": {0}, + "my-topic": {31}, }, }), ), "OffsetFetchRequest": sarama.NewMockOffsetFetchResponse(t).SetOffset( - "my-group", "my-topic", 0, 0, "", sarama.ErrNoError, + "my-group", "my-topic", 31, 0, "", sarama.ErrNoError, ).SetError(sarama.ErrNoError), "FetchRequest": sarama.NewMockSequence( sarama.NewMockFetchResponse(t, 1). - SetMessage("my-topic", 0, 0, sarama.StringEncoder(`{"resolved":"1.0"}`)), + SetMessage("my-topic", 31, 0, sarama.StringEncoder(`{"resolved":"1.0"}`)), sarama.NewMockFetchResponse(t, 1). - SetMessage("my-topic", 0, 1, sarama.StringEncoder(`{"after": {"k":1, "v": "a"},"updated":"2.0"}`)), + SetMessage("my-topic", 31, 1, sarama.StringEncoder(`{"after": {"k":1, "v": "a"},"updated":"2.0"}`)), sarama.NewMockFetchResponse(t, 1). - SetMessage("my-topic", 0, 2, sarama.StringEncoder(`{"after": {"k":2, "v": "a"},"updated":"2.0"}`)), + SetMessage("my-topic", 31, 2, sarama.StringEncoder(`{"after": {"k":2, "v": "a"},"updated":"2.0"}`)), sarama.NewMockFetchResponse(t, 1). - SetMessage("my-topic", 0, 3, sarama.StringEncoder(`{"resolved":"2.0"}`)), + SetMessage("my-topic", 31, 3, sarama.StringEncoder(`{"resolved":"2.0"}`)), sarama.NewMockFetchResponse(t, 1). - SetMessage("my-topic", 0, 4, sarama.StringEncoder(`{"after": {"k":2, "v": "a"},"updated":"9999.0"}`)), + SetMessage("my-topic", 31, 4, sarama.StringEncoder(`{"after": {"k":2, "v": "a"},"updated":"9999.0"}`)), ), }) @@ -138,15 +157,24 @@ func TestConn(t *testing.T) { } err := config.preflight(ctx) a.NoError(err) - acceptor := &mockAcceptor{} + target := &mockTarget{} + target.mu.timestamps = make(map[ident.Ident]hlc.Time) + target.mu.ensure = make(map[ident.Ident]bool) + conn := &Conn{ - acceptor: acceptor, - config: config, + config: config, } - err = conn.Start(ctx) + err = conn.Start(ctx, target) r.NoError(err) - for !acceptor.done() { + for !target.done() { time.Sleep(1 * time.Second) } + + a.Equal(map[ident.Ident]bool{ + ident.New("my-topic@31"): true, + }, target.ensureCalled()) + a.Equal(map[ident.Ident]hlc.Time{ + ident.New("my-topic@31"): hlc.New(2, 0), + }, target.timestamps()) mb.Close() } diff --git a/internal/source/kafka/consumer.go b/internal/source/kafka/consumer.go index 80e128f9..74e755f8 100644 --- a/internal/source/kafka/consumer.go +++ b/internal/source/kafka/consumer.go @@ -41,9 +41,9 @@ type partitionState struct { // Handler represents a Sarama consumer group consumer type Handler struct { // The destination for writes. - acceptor types.MultiAcceptor + target Target batchSize int - target ident.Schema + schema ident.Schema watchers types.Watchers timeRange hlc.Range fromState []*partitionState @@ -51,14 +51,13 @@ type Handler struct { // Setup is run at the beginning of a new session, before ConsumeClaim func (c *Handler) Setup(session sarama.ConsumerGroupSession) error { - // If the startup option provide a minTimestamp we mark the offset to the provided // timestamp or the latest read message, whichever is later, for each topic and partition. // In case we restart the process, we are able to resume from the latest committed message // without changing the start up command. // TODO (silvano): Should we have a --force option to restart from the provided minTimestamp? // Using a different group id would have the same effect. - for _, marker := range c.fromState { + for _, marker := range h.fromState { log.Debugf("setup: marking offset %s@%d to %d", marker.topic, marker.partition, marker.offset) session.MarkOffset(marker.topic, marker.partition, marker.offset, "start") } @@ -66,7 +65,7 @@ func (c *Handler) Setup(session sarama.ConsumerGroupSession) error { } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited -func (c *Handler) Cleanup(session sarama.ConsumerGroupSession) error { +func (h *Handler) Cleanup(session sarama.ConsumerGroupSession) error { if session.Context().Err() != nil { log.WithError(session.Context().Err()).Error("Session terminated with an error") } @@ -74,7 +73,7 @@ func (c *Handler) Cleanup(session sarama.ConsumerGroupSession) error { } // ConsumeClaim processes new messages for the topic/partition specified in the claim. -func (c *Handler) ConsumeClaim( +func (h *Handler) ConsumeClaim( session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, ) (err error) { log.Debugf("ConsumeClaim topic=%s partition=%d offset=%d", claim.Topic(), claim.Partition(), claim.InitialOffset()) @@ -83,6 +82,8 @@ func (c *Handler) ConsumeClaim( // Track last message received for each topic/partition. consumed := make(map[string]*sarama.ConsumerMessage) ctx := session.Context() + partition := fmt.Sprintf("%s@%d", claim.Topic(), claim.Partition()) + h.target.Ensure(ctx, []ident.Ident{ident.New(partition)}) // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29 @@ -95,19 +96,31 @@ func (c *Handler) ConsumeClaim( } partition := strconv.Itoa(int(claim.Partition())) mutationsReceivedCount.WithLabelValues(claim.Topic(), partition).Inc() - if err = c.accumulate(toProcess, message); err != nil { + if err = h.accumulate(toProcess, message); err != nil { mutationsErrorCount.WithLabelValues(claim.Topic(), partition).Inc() return err } mutationsSuccessCount.WithLabelValues(claim.Topic(), partition).Inc() consumed[fmt.Sprintf("%s@%d", message.Topic, message.Partition)] = message // Flush a batch, and mark the latest message for each topic/partition as read. - if toProcess.Count() > c.batchSize { + if toProcess.Count() > h.batchSize { if err = c.accept(ctx, toProcess); err != nil { return err } toProcess = toProcess.Empty() - c.mark(session, consumed) + consumed[fmt.Sprintf("%s@%d", message.Topic, message.Partition)] = message + h.mark(session, consumed) + continue + + } + consumed[fmt.Sprintf("%s@%d", message.Topic, message.Partition)] = message + // Flush a batch, and mark the latest message for each topic/partition as read. + if toProcess.Count() > h.batchSize { + if err = h.accept(ctx, toProcess); err != nil { + return err + } + toProcess = toProcess.Empty() + h.mark(session, consumed) } // Should return when `session.Context()` is done. // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: @@ -116,18 +129,19 @@ func (c *Handler) ConsumeClaim( return nil case <-time.After(time.Second): // Periodically flush a batch, and mark the latest message for each topic/partition as consumed. - if err = c.accept(ctx, toProcess); err != nil { + if err = h.accept(ctx, toProcess); err != nil { + log.Error(err) return err } toProcess = toProcess.Empty() - c.mark(session, consumed) + h.mark(session, consumed) } } } // mark advances the offset on each topic/partition and removes it from the map that // track the latest message received on the topic/partition. -func (c *Handler) mark( +func (h *Handler) mark( session sarama.ConsumerGroupSession, consumed map[string]*sarama.ConsumerMessage, ) { for key, message := range consumed { @@ -137,13 +151,13 @@ func (c *Handler) mark( } // accept process a batch. -func (c *Handler) accept(ctx context.Context, toProcess *types.MultiBatch) error { +func (h *Handler) accept(ctx context.Context, toProcess *types.MultiBatch) error { if toProcess.Count() == 0 { // Nothing to do. return nil } log.Tracef("flushing %d", toProcess.Count()) - if err := c.acceptor.AcceptMultiBatch(ctx, toProcess, &types.AcceptOptions{}); err != nil { + if err := h.target.AcceptMultiBatch(ctx, toProcess, &types.AcceptOptions{}); err != nil { return err } return nil @@ -151,13 +165,17 @@ func (c *Handler) accept(ctx context.Context, toProcess *types.MultiBatch) error // accumulate adds the message to the batch, after converting it to a types.Mutation. // Resolved messages are skipped. -func (c *Handler) accumulate(toProcess *types.MultiBatch, msg *sarama.ConsumerMessage) error { +func (h *Handler) accumulate(toProcess *types.MultiBatch, msg *sarama.ConsumerMessage) error { payload, err := asPayload(msg) if err != nil { return err } if payload.Resolved != "" { log.Tracef("Resolved %s %d [%s@%d]", payload.Resolved, msg.Timestamp.Unix(), msg.Topic, msg.Partition) + timestamp, err := hlc.Parse(payload.Resolved) + if err != nil { + return err + } return nil } log.Tracef("Mutation %s %d [%s@%d]", string(msg.Key), msg.Timestamp.Unix(), msg.Topic, msg.Partition) @@ -165,15 +183,15 @@ func (c *Handler) accumulate(toProcess *types.MultiBatch, msg *sarama.ConsumerMe if err != nil { return err } - table, qual, err := ident.ParseTableRelative(msg.Topic, c.target.Schema()) + table, qual, err := ident.ParseTableRelative(msg.Topic, h.schema.Schema()) if err != nil { return err } // Ensure the destination table is in the target schema. if qual != ident.TableOnly { - table = ident.NewTable(c.target.Schema(), table.Table()) + table = ident.NewTable(h.schema.Schema(), table.Table()) } - if !c.timeRange.Contains(timestamp) { + if !h.timeRange.Contains(timestamp) { log.Debugf("skipping mutation %s@%d %s %s", string(msg.Key), msg.Offset, timestamp, c.timeRange) return nil } diff --git a/internal/source/kafka/consumer_test.go b/internal/source/kafka/consumer_test.go index 97b2ea3f..a987f029 100644 --- a/internal/source/kafka/consumer_test.go +++ b/internal/source/kafka/consumer_test.go @@ -150,16 +150,17 @@ func TestAccumulate(t *testing.T) { toProcess := &types.MultiBatch{} consumer := &Handler{ timeRange: hlc.RangeIncluding(hlc.New(10, 0), hlc.New(13, 0)), - target: ident.MustSchema(ident.New("db"), ident.New("public")), + schema: ident.MustSchema(ident.New("db"), ident.New("public")), } for _, test := range tests { - err := consumer.accumulate(toProcess, test.msg) + _, err := consumer.accumulate(toProcess, test.msg) if test.wantErr != "" { a.Error(err) a.ErrorContains(err, test.wantErr) } else { r.NoError(err) } + } // Verify the we accumulated all the messages within the time range. a.Equal(3, len(toProcess.Data)) diff --git a/internal/source/kafka/injector.go b/internal/source/kafka/injector.go index 1e6e717b..5485d67e 100644 --- a/internal/source/kafka/injector.go +++ b/internal/source/kafka/injector.go @@ -23,10 +23,11 @@ import ( "context" scriptRuntime "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/sequencer/retire" "github.com/cockroachdb/cdc-sink/internal/sequencer/switcher" "github.com/cockroachdb/cdc-sink/internal/sinkprod" "github.com/cockroachdb/cdc-sink/internal/staging" - "github.com/cockroachdb/cdc-sink/internal/target" + tgt "github.com/cockroachdb/cdc-sink/internal/target" "github.com/cockroachdb/cdc-sink/internal/util/diag" "github.com/cockroachdb/cdc-sink/internal/util/stopper" "github.com/google/wire" @@ -46,6 +47,7 @@ func Start(ctx *stopper.Context, config *Config) (*Kafka, error) { scriptRuntime.Set, sinkprod.Set, staging.Set, - target.Set, + tgt.Set, + retire.Set, )) } diff --git a/internal/source/kafka/integration_test.go b/internal/source/kafka/integration_test.go index c0201573..4b122ec4 100644 --- a/internal/source/kafka/integration_test.go +++ b/internal/source/kafka/integration_test.go @@ -35,8 +35,9 @@ import ( ) type fixtureConfig struct { - chaos bool - script bool + chaos bool + script bool + immediate bool } const ( @@ -54,9 +55,14 @@ func TestMain(m *testing.M) { // TestKafka verifies that we can process simple messages from Kafka. // The kafka messages are generated by a CockroachDB changefeed in JSON format. func TestKafka(t *testing.T) { - t.Run("immediate", func(t *testing.T) { testIntegration(t, &fixtureConfig{}) }) - t.Run("immediate chaos", func(t *testing.T) { testIntegration(t, &fixtureConfig{chaos: true}) }) - t.Run("immediate script", func(t *testing.T) { testIntegration(t, &fixtureConfig{script: true}) }) + + t.Run("consistent", func(t *testing.T) { testIntegration(t, &fixtureConfig{}) }) + t.Run("consistent chaos", func(t *testing.T) { testIntegration(t, &fixtureConfig{chaos: true}) }) + t.Run("consistent script", func(t *testing.T) { testIntegration(t, &fixtureConfig{script: true}) }) + + t.Run("immediate", func(t *testing.T) { testIntegration(t, &fixtureConfig{immediate: true}) }) + t.Run("immediate chaos", func(t *testing.T) { testIntegration(t, &fixtureConfig{chaos: true, immediate: true}) }) + t.Run("immediate script", func(t *testing.T) { testIntegration(t, &fixtureConfig{script: true, immediate: true}) }) } func testIntegration(t *testing.T, fc *fixtureConfig) { @@ -182,8 +188,8 @@ func getConfig(fixture *base.Fixture, fc *fixtureConfig, tgt ident.Table) (*Conf }, ApplyTimeout: 2 * time.Minute, // Increase to make using the debugger easier. }, - TargetSchema: dbName, - + TargetSchema: dbName, + Immediate: fc.immediate, BatchSize: 100, Brokers: []string{broker}, Group: dbName.Raw(), diff --git a/internal/source/kafka/metrics.go b/internal/source/kafka/metrics.go index 1015e581..0df6e96e 100644 --- a/internal/source/kafka/metrics.go +++ b/internal/source/kafka/metrics.go @@ -40,4 +40,20 @@ var ( Name: "kafka_seeks_count", Help: "the total of messages read seeking a minimum resolved timestamp", }, []string{"topic", "partition"}) + resolvedMinTimestamp = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "kafka_target_applied_timestamp_seconds", + Help: "the wall time of the most recent applied resolved timestamp", + }, []string{"target"}) + resolvedMaxTimestamp = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "kafka_target_pending_timestamp_seconds", + Help: "the wall time of the most recently received resolved timestamp", + }, []string{"target"}) + sourceLagDuration = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "kafka_source_lag_seconds", + Help: "the age of the data received from the source changefeed", + }, []string{"target"}) + targetLagDuration = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "kafka_target_lag_seconds", + Help: "the age of the data applied to the table", + }, []string{"target"}) ) diff --git a/internal/source/kafka/provider.go b/internal/source/kafka/provider.go index 1596c42c..2437cc86 100644 --- a/internal/source/kafka/provider.go +++ b/internal/source/kafka/provider.go @@ -18,14 +18,12 @@ package kafka import ( "github.com/cockroachdb/cdc-sink/internal/script" - "github.com/cockroachdb/cdc-sink/internal/sequencer" "github.com/cockroachdb/cdc-sink/internal/sequencer/chaos" + "github.com/cockroachdb/cdc-sink/internal/sequencer/retire" "github.com/cockroachdb/cdc-sink/internal/sequencer/switcher" + "github.com/cockroachdb/cdc-sink/internal/staging/checkpoint" "github.com/cockroachdb/cdc-sink/internal/target/apply" "github.com/cockroachdb/cdc-sink/internal/types" - "github.com/cockroachdb/cdc-sink/internal/util/hlc" - "github.com/cockroachdb/cdc-sink/internal/util/ident" - "github.com/cockroachdb/cdc-sink/internal/util/notify" "github.com/cockroachdb/cdc-sink/internal/util/stopper" "github.com/google/wire" ) @@ -45,6 +43,8 @@ func ProvideConn( sw *switcher.Switcher, chaos *chaos.Chaos, config *Config, + checkpoints *checkpoint.Checkpoints, + retire *retire.Retire, memo types.Memo, stagingPool *types.StagingPool, targetPool *types.TargetPool, @@ -53,34 +53,22 @@ func ProvideConn( if err := config.Preflight(ctx); err != nil { return nil, err } - // ModeImmediate is the only mode supported for now. - mode := notify.VarOf(switcher.ModeImmediate) - sw = sw.WithMode(mode) - seq, err := chaos.Wrap(ctx, sw) // No-op if probability is 0. - if err != nil { - return nil, err + + conn := &Conn{ + config: config, + targetDB: targetPool, + watchers: watchers, + checkpoints: checkpoints, + retire: retire, + stagingDB: stagingPool, + switcher: sw, + tableAcceptor: acc, } - connAcceptor, _, err := seq.Start(ctx, &sequencer.StartOptions{ - Delegate: types.OrderedAcceptorFrom(acc, watchers), - Bounds: ¬ify.Var[hlc.Range]{}, // Not currently used. - Group: &types.TableGroup{ - Name: ident.New(config.TargetSchema.Raw()), - Enclosing: config.TargetSchema, - }, - }) + target, err := newTarget(ctx, conn) if err != nil { return nil, err } - - ret := &Conn{ - acceptor: connAcceptor, - config: config, - mode: mode, - targetDB: targetPool, - watchers: watchers, - } - - return (*Conn)(ret), ret.Start(ctx) + return (*Conn)(conn), conn.Start(ctx, target) } // ProvideEagerConfig is a hack to move up the evaluation of the user diff --git a/internal/source/kafka/target.go b/internal/source/kafka/target.go new file mode 100644 index 00000000..57412e87 --- /dev/null +++ b/internal/source/kafka/target.go @@ -0,0 +1,246 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package kafka + +import ( + "context" + "time" + + "github.com/cockroachdb/cdc-sink/internal/sequencer" + "github.com/cockroachdb/cdc-sink/internal/sequencer/switcher" + "github.com/cockroachdb/cdc-sink/internal/staging/checkpoint" + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/hlc" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/metrics" + "github.com/cockroachdb/cdc-sink/internal/util/notify" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/cockroachdb/cdc-sink/internal/util/stopvar" + log "github.com/sirupsen/logrus" +) + +// target manages the plumbing necessary to deliver changefeed messages +// to a target schema. It is also responsible for mode-switching. +type target struct { + config *Config // Connector configuration. + acceptor types.MultiAcceptor // Possibly-async writes to the target. + checkpoint *checkpoint.Group // Persistence of checkpoint (fka. resolved) timestamps + mode notify.Var[switcher.Mode] // Switchable strategies. + resolvingRange notify.Var[hlc.Range] // Range of resolved timestamps to be processed. + stat *notify.Var[sequencer.Stat] // Processing status. + +} + +// Target exposes the methods required to deliver mutations, in batches, to +// the destination. It controls the checkpoint timestamp associated to each partition. +type Target interface { + // AcceptMultiBatch processes a batch. The batch is committed to the target + // database or to a staging area, depending on the mode in which + // the connector is running. + AcceptMultiBatch(context.Context, *types.MultiBatch, *types.AcceptOptions) error + // Advance extends the proposed checkpoint timestamp associated with a partition. + // It is called when a resolved timestamp is received by the consumer. + Advance(context.Context, ident.Ident, hlc.Time) error + // Ensure that a checkpoint exists for all the given partitions. It should be + // called every time a new partition or topic is discovered by the consumer group. + Ensure(context.Context, []ident.Ident) error +} + +var _ Target = &target{} + +// AcceptMultiBatch implements Target. +func (t *target) AcceptMultiBatch(ctx context.Context, batch *types.MultiBatch, options *types.AcceptOptions) error { + return t.acceptor.AcceptMultiBatch(ctx, batch, options) +} + +// Advance implements Target. +func (t *target) Advance(ctx context.Context, partition ident.Ident, ts hlc.Time) error { + return t.checkpoint.Advance(ctx, partition, ts) +} + +// Ensure implements Target. +func (t *target) Ensure(ctx context.Context, partitions []ident.Ident) error { + return t.checkpoint.Ensure(ctx, partitions) +} + +func newTarget(ctx *stopper.Context, c *Conn) (Target, error) { + target := &target{ + config: c.config, + } + schema := c.config.TargetSchema + w, err := c.watchers.Get(schema) + if err != nil { + return nil, err + } + var tables []ident.Table + _ = w.Get().Columns.Range(func(tbl ident.Table, _ []types.ColData) error { + tables = append(tables, tbl) + return nil + }) + + tableGroup := &types.TableGroup{ + Enclosing: schema, + Name: ident.New(c.config.Group), + Tables: tables, + } + + target.checkpoint, err = c.checkpoints.Start(ctx, tableGroup, &target.resolvingRange) + if err != nil { + return nil, err + } + + // Set the mode before starting the switcher. + target.modeSelector(ctx) + + target.acceptor, target.stat, err = c.switcher.WithMode(&target.mode).Start( + ctx, + &sequencer.StartOptions{ + Bounds: &target.resolvingRange, + Delegate: types.OrderedAcceptorFrom(c.tableAcceptor, c.watchers), + Group: tableGroup, + }) + if err != nil { + return nil, err + } + + labels := metrics.SchemaValues(schema) + // Add top-of-funnel reporting. + target.acceptor = types.CountingAcceptor(target.acceptor, + mutationsErrorCount.WithLabelValues(labels...), + mutationsReceivedCount.WithLabelValues(labels...), + mutationsSuccessCount.WithLabelValues(labels...), + ) + + // Advance the stored resolved timestamps. + target.updateResolved(ctx) + + // Allow old staged mutations to be retired. + c.retire.Start(ctx, tableGroup, &target.resolvingRange) + + // Report timestamps and lag. + target.metrics(ctx) + return target, nil +} + +func (t *target) metrics(ctx *stopper.Context) { + // We use an interval to ensure that this metric will tick at a + // reasonable rate in the idle condition. + const tick = 1 * time.Second + + // Export the min and max resolved timestamps that we see and + // include lag computations. This happens on each node, regardless + // of whether it holds a lease. + ctx.Go(func() error { + schema := t.config.TargetSchema.Raw() + min := resolvedMinTimestamp.WithLabelValues(schema) + max := resolvedMaxTimestamp.WithLabelValues(schema) + sourceLag := sourceLagDuration.WithLabelValues(schema) + targetLag := targetLagDuration.WithLabelValues(schema) + _, err := stopvar.DoWhenChangedOrInterval(ctx, + hlc.RangeEmpty(), &t.resolvingRange, tick, + func(ctx *stopper.Context, _, new hlc.Range) error { + min.Set(float64(new.Min().Nanos()) / 1e9) + targetLag.Set(float64(time.Now().UnixNano()-new.Min().Nanos()) / 1e9) + max.Set(float64(new.MaxInclusive().Nanos()) / 1e9) + sourceLag.Set(float64(time.Now().UnixNano()-new.MaxInclusive().Nanos()) / 1e9) + return nil + }) + return err + }) +} + +func (t *target) modeSelector(ctx *stopper.Context) { + if t.config.Immediate { + t.mode.Set(switcher.ModeImmediate) + return + } else if t.config.BestEffortOnly { + t.mode.Set(switcher.ModeBestEffort) + return + } + // The initial update will be async, so wait for it. + _, initialSet := t.mode.Get() + ctx.Go(func() error { + _, err := stopvar.DoWhenChangedOrInterval(ctx, + hlc.RangeEmptyAt(hlc.New(-1, -1)), // Pick a non-zero time so the callback fires. + &t.resolvingRange, + 10*time.Second, // Re-evaluate to allow un-jamming big serial transactions. + func(ctx *stopper.Context, _, bounds hlc.Range) error { + minTime := time.Unix(0, bounds.Min().Nanos()) + lag := time.Since(minTime) + + // Sometimes you don't know what you want. + want := switcher.ModeUnknown + if t.config.BestEffortWindow <= 0 { + // Force a consistent mode. + want = switcher.ModeConsistent + } else if lag >= t.config.BestEffortWindow { + // Fallen behind, switch to best-effort. + want = switcher.ModeBestEffort + } else if lag <= t.config.BestEffortWindow/4 { + // Caught up close-enough to the current time. + want = switcher.ModeConsistent + } + + _, _, _ = t.mode.Update(func(current switcher.Mode) (switcher.Mode, error) { + // Pick a reasonable default for uninitialized case. + // Choosing BestEffort here allows us to optimize + // for the case where a user creates a changefeed + // that's going to perform a large backfill. + if current == switcher.ModeUnknown && want == switcher.ModeUnknown { + want = switcher.ModeBestEffort + } else if want == switcher.ModeUnknown || current == want { + // No decision above or no change. + return current, notify.ErrNoUpdate + } + + log.Tracef("setting group %s mode to %s", t.config.TargetSchema, want) + return want, nil + }) + return nil + }) + return err + }) + <-initialSet +} + +// updateResolved will monitor the timestamp to which tables in the +// group have advanced and update the resolved timestamp table. +func (t *target) updateResolved(ctx *stopper.Context) { + ctx.Go(func() error { + _, err := stopvar.DoWhenChanged(ctx, + nil, + t.stat, + func(ctx *stopper.Context, old, new sequencer.Stat) error { + oldMin := sequencer.CommonProgress(old) + newMin := sequencer.CommonProgress(new) + + // Not an interesting change since the minimum didn't advance. + if oldMin == newMin { + return nil + } + // Mark the range as being complete. + if err := t.checkpoint.Commit(ctx, newMin); err != nil { + log.WithError(err).Warnf( + "could not store updated resolved timestamp for %s; will continue", + t.config.TargetSchema, + ) + } + return nil + }) + return err + }) +} diff --git a/internal/source/kafka/wire_gen.go b/internal/source/kafka/wire_gen.go index 84b9a669..71308041 100644 --- a/internal/source/kafka/wire_gen.go +++ b/internal/source/kafka/wire_gen.go @@ -1,6 +1,6 @@ // Code generated by Wire. DO NOT EDIT. -//go:generate go run -mod=mod github.com/google/wire/cmd/wire +//go:generate go run github.com/google/wire/cmd/wire //go:build !wireinject // +build !wireinject @@ -12,10 +12,12 @@ import ( "github.com/cockroachdb/cdc-sink/internal/sequencer/chaos" "github.com/cockroachdb/cdc-sink/internal/sequencer/core" "github.com/cockroachdb/cdc-sink/internal/sequencer/immediate" + "github.com/cockroachdb/cdc-sink/internal/sequencer/retire" "github.com/cockroachdb/cdc-sink/internal/sequencer/scheduler" script2 "github.com/cockroachdb/cdc-sink/internal/sequencer/script" "github.com/cockroachdb/cdc-sink/internal/sequencer/switcher" "github.com/cockroachdb/cdc-sink/internal/sinkprod" + "github.com/cockroachdb/cdc-sink/internal/staging/checkpoint" "github.com/cockroachdb/cdc-sink/internal/staging/leases" "github.com/cockroachdb/cdc-sink/internal/staging/memo" "github.com/cockroachdb/cdc-sink/internal/staging/stage" @@ -89,11 +91,16 @@ func Start(ctx *stopper.Context, config *Config) (*Kafka, error) { chaosChaos := &chaos.Chaos{ Config: sequencerConfig, } + checkpoints, err := checkpoint.ProvideCheckpoints(ctx, stagingPool, stagingSchema) + if err != nil { + return nil, err + } + retireRetire := retire.ProvideRetire(sequencerConfig, stagingPool, stagers) memoMemo, err := memo.ProvideMemo(ctx, stagingPool, stagingSchema) if err != nil { return nil, err } - conn, err := ProvideConn(ctx, acceptor, switcherSwitcher, chaosChaos, config, memoMemo, stagingPool, targetPool, watchers) + conn, err := ProvideConn(ctx, acceptor, switcherSwitcher, chaosChaos, config, checkpoints, retireRetire, memoMemo, stagingPool, targetPool, watchers) if err != nil { return nil, err }