Skip to content

Commit

Permalink
kafka: adding best effort and transactional consistent modes.
Browse files Browse the repository at this point in the history
This change adds transactional support to the Kafka connector for CRDB
changefeeds.

The changefeeds broadcast the resolved timestamp to each partition, and we
advance the checkpoint within cdc-sink based on the minimal resolved timestamp
received on all the partitions.

Closes #777.
  • Loading branch information
sravotto committed Apr 30, 2024
1 parent a2144ad commit 924ddb4
Show file tree
Hide file tree
Showing 11 changed files with 444 additions and 116 deletions.
25 changes: 20 additions & 5 deletions internal/source/kafka/config.go
Expand Up @@ -36,6 +36,10 @@ import (
"golang.org/x/oauth2/clientcredentials"
)

const (
defaultBackfillWindow = time.Hour
)

// EagerConfig is a hack to get Wire to move userscript evaluation to
// the beginning of the injector. This allows CLI flags to be set by the
// script.
Expand All @@ -53,15 +57,18 @@ type Config struct {
TLS secure.Config

BatchSize int // How many messages to accumulate before committing to the target
BestEffortWindow time.Duration // Controls switching between BestEffort and Consistent mode
BestEffortOnly bool // Force the use of BestEffort mode.
Brokers []string // The address of the Kafka brokers
Group string // the Kafka consumer group id.
Immediate bool // Write directly to staging tables.
MaxTimestamp string // Only accept messages at or older than this timestamp
MinTimestamp string // Only accept messages at or newer than this timestamp
ResolvedInterval time.Duration // Minimal duration between resolved timestamps.
Strategy string // Kafka consumer group re-balance strategy
Topics []string // The list of topics that the consumer should use.

// SASL
// SASL parameters
saslClientID string
saslClientSecret string
saslGrantType string
Expand All @@ -73,11 +80,10 @@ type Config struct {

// The following are computed.

// Timestamp range, computed based on minTimestamp and maxTimestamp.
timeRange hlc.Range

// The kafka connector configuration.
saramaConfig *sarama.Config
// Timestamp range, computed based on minTimestamp and maxTimestamp.
timeRange hlc.Range
}

// Bind adds flags to the set. It delegates to the embedded Config.Bind.
Expand Down Expand Up @@ -111,8 +117,17 @@ Please see the CREATE CHANGEFEED documentation for details.
`)
f.StringVar(&c.Strategy, "strategy", "sticky", "Kafka consumer group re-balance strategy")
f.StringArrayVar(&c.Topics, "topic", nil, "the topic(s) that the consumer should use")
// Operation Modes
f.DurationVar(&c.BestEffortWindow, "bestEffortWindow", defaultBackfillWindow,
"use an eventually-consistent mode for initial backfill or when replication "+
"is behind; 0 to disable")
f.BoolVar(&c.BestEffortOnly, "bestEffortOnly", false,
"eventually-consistent mode; useful for high throughput, skew-tolerant schemas with FKs")
f.BoolVar(&c.Immediate, "immediate", false,
"bypass staging tables and write directly to target; "+
"recommended only for KV-style workloads with no FKs")

// SASL
// SASL configuration
f.StringVar(&c.saslClientID, "saslClientId", "", "client ID for OAuth authentication from a third-party provider")
f.StringVar(&c.saslClientSecret, "saslClientSecret", "", "Client secret for OAuth authentication from a third-party provider")
f.StringVar(&c.saslGrantType, "saslGrantType", "", "Override the default OAuth client credentials grant type for other implementations")
Expand Down
35 changes: 18 additions & 17 deletions internal/source/kafka/conn.go
Expand Up @@ -20,10 +20,12 @@ import (
"time"

"github.com/IBM/sarama"
"github.com/cockroachdb/cdc-sink/internal/sequencer/retire"
"github.com/cockroachdb/cdc-sink/internal/sequencer/switcher"
"github.com/cockroachdb/cdc-sink/internal/staging/checkpoint"
"github.com/cockroachdb/cdc-sink/internal/target/apply"
"github.com/cockroachdb/cdc-sink/internal/types"
"github.com/cockroachdb/cdc-sink/internal/util/hlc"
"github.com/cockroachdb/cdc-sink/internal/util/notify"
"github.com/cockroachdb/cdc-sink/internal/util/stopper"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand All @@ -32,28 +34,26 @@ import (
// Conn encapsulates all wire-connection behavior. It is
// responsible for receiving replication messages and replying with
// status updates.
// TODO (silvano) : support transactional mode
// https://github.com/cockroachdb/cdc-sink/issues/777
// note: we get resolved timestamps on all the partitions,
// so we should be able to leverage that.
//
// TODO (silvano): support Avro format, schema registry.
// https://github.com/cockroachdb/cdc-sink/issues/776
//
// TODO (silvano): add metrics.
// https://github.com/cockroachdb/cdc-sink/issues/778

type Conn struct {
// The destination for writes.
acceptor types.MultiAcceptor
// Factory for checkpoint management
checkpoints *checkpoint.Checkpoints
// The connector configuration.
config *Config

// The group id used when connecting to the broker.
group sarama.ConsumerGroup
// The handler that processes the events.
handler sarama.ConsumerGroupHandler
// The switcher mode
mode *notify.Var[switcher.Mode]
// Utility to remove old mutations
retire *retire.Retire
// Access to the staging database.
stagingDB *types.StagingPool
// Manage the operational mode.
switcher *switcher.Switcher
// Writes mutation to the target database.
tableAcceptor *apply.Acceptor
// Access to the target database.
targetDB *types.TargetPool
// Access to the target schema.
Expand All @@ -69,7 +69,7 @@ type offsetRange struct {
// from the given topics.
// If more that one processes is started, the partitions within the topics
// are allocated to each process based on the chosen rebalance strategy.
func (c *Conn) Start(ctx *stopper.Context) (err error) {
func (c *Conn) Start(ctx *stopper.Context, target Target) (err error) {
var start []*partitionState
if c.config.MinTimestamp != "" {
start, err = c.getOffsets(c.config.timeRange.Min())
Expand All @@ -81,10 +81,11 @@ func (c *Conn) Start(ctx *stopper.Context) (err error) {
if err != nil {
return errors.WithStack(err)
}

c.handler = &Handler{
acceptor: c.acceptor,
target: target,
batchSize: c.config.BatchSize,
target: c.config.TargetSchema,
schema: c.config.TargetSchema,
watchers: c.watchers,
timeRange: c.config.timeRange,
fromState: start,
Expand Down
98 changes: 63 additions & 35 deletions internal/source/kafka/conn_test.go
Expand Up @@ -25,54 +25,73 @@ import (
"github.com/IBM/sarama"
"github.com/cockroachdb/cdc-sink/internal/types"
"github.com/cockroachdb/cdc-sink/internal/util/hlc"
"github.com/cockroachdb/cdc-sink/internal/util/ident"
"github.com/cockroachdb/cdc-sink/internal/util/stopper"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var _ types.MultiAcceptor = &mockAcceptor{}

type mockAcceptor struct {
type mockTarget struct {
mu struct {
sync.Mutex
done bool
done bool
timestamps map[ident.Ident]hlc.Time
ensure map[ident.Ident]bool
}
}

func (a *mockAcceptor) done() bool {
a.mu.Lock()
defer a.mu.Unlock()
return a.mu.done
}
var _ Target = &mockTarget{}
var sentinel = hlc.New(9999, 0)

// AcceptMultiBatch implements types.MultiAcceptor.
func (a *mockAcceptor) AcceptMultiBatch(
// AcceptMultiBatch implements Target.
func (a *mockTarget) AcceptMultiBatch(
_ context.Context, batch *types.MultiBatch, _ *types.AcceptOptions,
) error {
a.mu.Lock()
defer a.mu.Unlock()
if batch.ByTime[hlc.New(9999, 0)] != nil {
if batch.ByTime[sentinel] != nil {
a.mu.done = true
log.Info("AcceptMultiBatch found sentinel")
}
return nil
}

// AcceptTableBatch implements types.MultiAcceptor.
func (a *mockAcceptor) AcceptTableBatch(
context.Context, *types.TableBatch, *types.AcceptOptions,
) error {
// Advance implements Target.
func (a *mockTarget) Advance(_ context.Context, partition ident.Ident, ts hlc.Time) error {
a.mu.Lock()
defer a.mu.Unlock()
a.mu.timestamps[partition] = ts
return nil
}

// AcceptTemporalBatch implements types.MultiAcceptor.
func (a *mockAcceptor) AcceptTemporalBatch(
context.Context, *types.TemporalBatch, *types.AcceptOptions,
) error {
// Ensure implements Target.
func (a *mockTarget) Ensure(_ context.Context, partitions []ident.Ident) error {
a.mu.Lock()
defer a.mu.Unlock()
for _, p := range partitions {
a.mu.ensure[p] = true
}
return nil
}

func (a *mockTarget) done() bool {
a.mu.Lock()
defer a.mu.Unlock()
return a.mu.done
}

func (a *mockTarget) ensureCalled() map[ident.Ident]bool {
a.mu.Lock()
defer a.mu.Unlock()
return a.mu.ensure
}
func (a *mockTarget) timestamps() map[ident.Ident]hlc.Time {
a.mu.Lock()
defer a.mu.Unlock()
return a.mu.timestamps
}

// TestConn verifies that we can process messages using a simple mock broker.
func TestConn(t *testing.T) {
ctx := stopper.Background()
Expand All @@ -86,10 +105,10 @@ func TestConn(t *testing.T) {
sarama.NewMockSaslAuthenticateResponse(t)),
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(mb.Addr(), mb.BrokerID()).
SetLeader("my-topic", 0, mb.BrokerID()),
SetLeader("my-topic", 31, mb.BrokerID()),
"OffsetRequest": sarama.NewMockOffsetResponse(t).
SetOffset("my-topic", 0, sarama.OffsetOldest, 0).
SetOffset("my-topic", 0, sarama.OffsetNewest, 1),
SetOffset("my-topic", 31, sarama.OffsetOldest, 0).
SetOffset("my-topic", 31, sarama.OffsetNewest, 1),
"FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t).
SetCoordinator(sarama.CoordinatorGroup, "my-group", mb),
"HeartbeatRequest": sarama.NewMockHeartbeatResponse(t),
Expand All @@ -103,24 +122,24 @@ func TestConn(t *testing.T) {
&sarama.ConsumerGroupMemberAssignment{
Version: 0,
Topics: map[string][]int32{
"my-topic": {0},
"my-topic": {31},
},
}),
),
"OffsetFetchRequest": sarama.NewMockOffsetFetchResponse(t).SetOffset(
"my-group", "my-topic", 0, 0, "", sarama.ErrNoError,
"my-group", "my-topic", 31, 0, "", sarama.ErrNoError,
).SetError(sarama.ErrNoError),
"FetchRequest": sarama.NewMockSequence(
sarama.NewMockFetchResponse(t, 1).
SetMessage("my-topic", 0, 0, sarama.StringEncoder(`{"resolved":"1.0"}`)),
SetMessage("my-topic", 31, 0, sarama.StringEncoder(`{"resolved":"1.0"}`)),
sarama.NewMockFetchResponse(t, 1).
SetMessage("my-topic", 0, 1, sarama.StringEncoder(`{"after": {"k":1, "v": "a"},"updated":"2.0"}`)),
SetMessage("my-topic", 31, 1, sarama.StringEncoder(`{"after": {"k":1, "v": "a"},"updated":"2.0"}`)),
sarama.NewMockFetchResponse(t, 1).
SetMessage("my-topic", 0, 2, sarama.StringEncoder(`{"after": {"k":2, "v": "a"},"updated":"2.0"}`)),
SetMessage("my-topic", 31, 2, sarama.StringEncoder(`{"after": {"k":2, "v": "a"},"updated":"2.0"}`)),
sarama.NewMockFetchResponse(t, 1).
SetMessage("my-topic", 0, 3, sarama.StringEncoder(`{"resolved":"2.0"}`)),
SetMessage("my-topic", 31, 3, sarama.StringEncoder(`{"resolved":"2.0"}`)),
sarama.NewMockFetchResponse(t, 1).
SetMessage("my-topic", 0, 4, sarama.StringEncoder(`{"after": {"k":2, "v": "a"},"updated":"9999.0"}`)),
SetMessage("my-topic", 31, 4, sarama.StringEncoder(`{"after": {"k":2, "v": "a"},"updated":"9999.0"}`)),
),
})

Expand All @@ -138,15 +157,24 @@ func TestConn(t *testing.T) {
}
err := config.preflight(ctx)
a.NoError(err)
acceptor := &mockAcceptor{}
target := &mockTarget{}
target.mu.timestamps = make(map[ident.Ident]hlc.Time)
target.mu.ensure = make(map[ident.Ident]bool)

conn := &Conn{
acceptor: acceptor,
config: config,
config: config,
}
err = conn.Start(ctx)
err = conn.Start(ctx, target)
r.NoError(err)
for !acceptor.done() {
for !target.done() {
time.Sleep(1 * time.Second)
}

a.Equal(map[ident.Ident]bool{
ident.New("my-topic@31"): true,
}, target.ensureCalled())
a.Equal(map[ident.Ident]hlc.Time{
ident.New("my-topic@31"): hlc.New(2, 0),
}, target.timestamps())
mb.Close()
}

0 comments on commit 924ddb4

Please sign in to comment.