Skip to content

Commit

Permalink
logical: Add support for backfilling
Browse files Browse the repository at this point in the history
This change adds an explicit API for logical dialects which can support faster
backfilling of data. The goal is to use fan mode during a backfill and then
switch to a fully-transactional mode once the data load has been completed.

The backfill mode is gated behind an --backfillWindow flag, to ensure that the
operator is aware that the database may not be transactionally-consistent with
respect to the source if replication lags by more than the specified window.

A memoizing logical.Factory type is added to support a future use case that is
best expressed as a collection of independent replication loops that share the
target stack. The logical.Backfiller interface is also added to support future
work.

The loop type no longer implements the loop.Event interface. Instead, we
introduce a fan- and a serial-mode implementation, which are chosen based on
the mode in which the loop is operating. Wrappers are added to suport chaos and
metrics collection.

The pglogical and mylogical packages are updated to support the revised API.
  • Loading branch information
bobvawter committed Aug 2, 2022
1 parent becd684 commit 6696851
Show file tree
Hide file tree
Showing 25 changed files with 912 additions and 468 deletions.
66 changes: 41 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,13 @@ scrape_configs:
- targets: [ '127.0.0.1:30004' ]
```

### Backfill mode

The `--backfillWindow` flag can be used with logical-replication modes to ignore source transaction
boundaries if the replication lag exceeds a certain threshold or when first populating data into the
target database. This flag is useful when `cdc-sink` is not expected to be run continuously and will
need to catch up to the current state in the source database.

### Immediate mode

Immediate mode writes incoming mutations to the target schema as soon as they are received, instead
Expand Down Expand Up @@ -369,18 +376,23 @@ Usage:
cdc-sink pglogical [flags]
Flags:
--applyTimeout duration the maximum amount of time to wait for an update to be applied (default 30s)
--bytesInFlight int apply backpressure when amount of in-flight mutation data reaches this limit (default 10485760)
-h, --help help for pglogical
--immediate apply data without waiting for transaction boundaries
--metricsAddr string a host:port to serve metrics from at /_/varz
--publicationName string the publication within the source database to replicate
--retryDelay duration the amount of time to sleep between replication retries (default 10s)
--slotName string the replication slot in the source database (default "cdc_sink")
--sourceConn string the source database's connection string
--targetConn string the target cluster's connection string
--targetDB string the SQL database in the target cluster to update
--targetDBConns int the maximum pool size to the target cluster (default 1024)
--applyTimeout duration the maximum amount of time to wait for an update to be applied (default 30s)
--backfillWindow duration use a high-throughput, but non-transactional mode if replication is this far behind
--bytesInFlight int apply backpressure when amount of in-flight mutation data reaches this limit (default 10485760)
--fanShards int the number of concurrent connections to use when writing data in fan mode (default 16)
-h, --help help for pglogical
--immediate apply data without waiting for transaction boundaries
--loopName string identify the replication loop in metrics (default "pglogical")
--metricsAddr string a host:port to serve metrics from at /_/varz
--publicationName string the publication within the source database to replicate
--retryDelay duration the amount of time to sleep between replication retries (default 10s)
--slotName string the replication slot in the source database (default "cdc_sink")
--sourceConn string the source database's connection string
--stagingDB string a SQL database to store metadata in (default "_cdc_sink")
--standbyTimeout duration how often to commit the consistent point (default 5s)
--targetConn string the target cluster's connection string
--targetDB string the SQL database in the target cluster to update
--targetDBConns int the maximum pool size to the target cluster (default 1024)
Global Flags:
--logDestination string write logs to a file, instead of stdout
Expand Down Expand Up @@ -442,18 +454,22 @@ Usage:
cdc-sink mylogical [flags]
Flags:
--applyTimeout duration the maximum amount of time to wait for an update to be applied (default 30s)
--bytesInFlight int apply backpressure when amount of in-flight mutation data reaches this limit (default 10485760)
--consistentPointKey string unique key used for this process to persist state information
--defaultGTIDSet string default GTIDSet. Used if no state is persisted
-h, --help help for mylogical
--immediate apply data without waiting for transaction boundaries
--metricsAddr string a host:port to serve metrics from at /_/varz
--retryDelay duration the amount of time to sleep between replication retries (default 10s)
--sourceConn string the source database's connection string
--targetConn string the target cluster's connection string
--targetDB string the SQL database in the target cluster to update
--targetDBConns int the maximum pool size to the target cluster (default 1024)
--applyTimeout duration the maximum amount of time to wait for an update to be applied (default 30s)
--backfillWindow duration use a high-throughput, but non-transactional mode if replication is this far behind
--bytesInFlight int apply backpressure when amount of in-flight mutation data reaches this limit (default 10485760)
--defaultGTIDSet string default GTIDSet. Used if no state is persisted
--fanShards int the number of concurrent connections to use when writing data in fan mode (default 16)
-h, --help help for mylogical
--immediate apply data without waiting for transaction boundaries
--loopName string identify the replication loop in metrics (default "mylogical")
--metricsAddr string a host:port to serve metrics from at /_/varz
--retryDelay duration the amount of time to sleep between replication retries (default 10s)
--sourceConn string the source database's connection string
--stagingDB string a SQL database to store metadata in (default "_cdc_sink")
--standbyTimeout duration how often to commit the consistent point (default 5s)
--targetConn string the target cluster's connection string
--targetDB string the SQL database in the target cluster to update
--targetDBConns int the maximum pool size to the target cluster (default 1024)
Global Flags:
--logDestination string write logs to a file, instead of stdout
Expand Down Expand Up @@ -521,7 +537,7 @@ SET GLOBAL gtid_slave_pos='0-1-1';

- Import the database into Cockroach DB, following the instructions
at [Migrate from MySQL](https://www.cockroachlabs.com/docs/stable/migrate-from-mysql.html).
- Run `cdc-sink mylogical` with at least the `--sourceConn`, `--targetConn`, `--consistentPointKey`
- Run `cdc-sink mylogical` with at least the `--sourceConn`, `--targetConn`
, `--defaultGTIDSet` and `--targetDB`. Set `--defaultGTIDSet` to the GTID state shown above.

## Security Considerations
Expand Down
12 changes: 10 additions & 2 deletions internal/source/logical/chaos.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func (d *chaosDialect) Process(ctx context.Context, ch <-chan Message, events Ev
return d.delegate.Process(ctx, ch, &chaosEvents{events, d.prob})
}

func (d *chaosDialect) UnmarshalStamp(stamp []byte) (stamp.Stamp, error) {
return d.delegate.UnmarshalStamp(stamp)
func (d *chaosDialect) ZeroStamp() stamp.Stamp {
return d.delegate.ZeroStamp()
}

type chaosEvents struct {
Expand Down Expand Up @@ -103,3 +103,11 @@ func (e *chaosEvents) OnRollback(ctx context.Context, msg Message) error {
}
return e.delegate.OnRollback(ctx, msg)
}

func (e *chaosEvents) stop() {
e.delegate.stop()
}

func (e *chaosEvents) setConsistentPoint(s stamp.Stamp) {
e.delegate.setConsistentPoint(s)
}
32 changes: 28 additions & 4 deletions internal/source/logical/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type Config struct {
// The maximum length of time to wait for an incoming transaction
// to settle (i.e. to detect stalls in the target database).
ApplyTimeout time.Duration
// BackfillWindow enables the use of fan mode for backfilling data
// sources if the consistent point is older than the specified
// duration. A zero value disables the use of backfill mode.
BackfillWindow time.Duration
// The maximum number of raw tuple-data that has yet to be applied
// to the target database. This will act as an approximate upper
// bound on the amount of in-memory tuple data by pausing the
Expand All @@ -38,17 +42,22 @@ type Config struct {
BytesInFlight int
// Used in testing to inject errors during processing.
ChaosProb float32
// If present, the key used to persist consistent point identifiers.
ConsistentPointKey string
// The default Consistent Point to use for replication.
// Consistent Point persisted in the target database will be used, if available.
DefaultConsistentPoint string
// The number of concurrent connections to use when writing data in
// fan mode.
FanShards int
// Place the configuration into immediate mode, where mutations are
// applied without waiting for transaction boundaries.
Immediate bool
// If present, uniquely identifies the replication loop.
LoopName string
// The amount of time to sleep between replication-loop retries.
// If zero, a default value will be used.
RetryDelay time.Duration
// How often to commit the latest consistent point.
StandbyTimeout time.Duration
// The name of a SQL database in the target cluster to store
// metadata in.
StagingDB ident.Ident
Expand All @@ -68,26 +77,41 @@ type Config struct {
func (c *Config) Bind(f *pflag.FlagSet) {
f.DurationVar(&c.ApplyTimeout, "applyTimeout", 30*time.Second,
"the maximum amount of time to wait for an update to be applied")
f.DurationVar(&c.BackfillWindow, "backfillWindow", 0,
"use a high-throughput, but non-transactional mode if replication is this far behind")
f.IntVar(&c.BytesInFlight, "bytesInFlight", 10*1024*1024,
"apply backpressure when amount of in-flight mutation data reaches this limit")
// ConsistentPointKey used only by mylogical.
// DefaultConsistentPoint used only by mylogical.
// LoopName bound by dialect packages.
// DefaultConsistentPoint bound by dialect packages.
f.BoolVar(&c.Immediate, "immediate", false, "apply data without waiting for transaction boundaries")
f.IntVar(&c.FanShards, "fanShards", 16,
"the number of concurrent connections to use when writing data in fan mode")
f.DurationVar(&c.RetryDelay, "retryDelay", 10*time.Second,
"the amount of time to sleep between replication retries")
f.StringVar(&c.stagingDB, "stagingDB", "_cdc_sink", "a SQL database to store metadata in")
f.DurationVar(&c.StandbyTimeout, "standbyTimeout", 5*time.Second,
"how often to commit the consistent point")
f.StringVar(&c.TargetConn, "targetConn", "", "the target cluster's connection string")
f.StringVar(&c.targetDB, "targetDB", "", "the SQL database in the target cluster to update")
f.IntVar(&c.TargetDBConns, "targetDBConns", 1024, "the maximum pool size to the target cluster")
}

// Copy returns a deep copy of the Config.
func (c *Config) Copy() *Config {
ret := *c
return &ret
}

// Preflight ensures that unset configuration options have sane defaults
// and returns an error if the Config is missing any fields for which a
// default connot be provided.
func (c *Config) Preflight() error {
if c.ApplyTimeout == 0 {
c.ApplyTimeout = defaultApplyTimeout
}
if c.BackfillWindow < 0 {
return errors.New("backfillWindow must be >= 0")
}
if c.BytesInFlight == 0 {
c.BytesInFlight = defaultBytesInFlight
}
Expand Down
40 changes: 33 additions & 7 deletions internal/source/logical/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ import (
"github.com/cockroachdb/cdc-sink/internal/util/stamp"
)

// Backfiller is an optional capability interface for Dialect
// implementations. The BackfillInto method will be called instead
// of ReadInto when the logical loop has detected a backfill state.
type Backfiller interface {
// BackfillInto represents a potentially-fragile source of
// logical-replication messages that should be applied in a
// high-throughput manner. Implementations should treat BackfillInto
// as a signal to "catch up" with replication and then return once
// the backfill process has completed.
//
// See also discussion on Dialect.ReadInto.
BackfillInto(ctx context.Context, ch chan<- Message, state State) error
}

// Dialect encapsulates the source-specific implementation details.
type Dialect interface {
// ReadInto represents a potentially-fragile source of
Expand All @@ -29,18 +43,21 @@ type Dialect interface {
// called again to restart the replication feed from the most recent
// consistent point.
//
// The "from" argument is the last consistent point that was
// The state argument provides the last consistent point that was
// processed by the stream. This can be used to verify successful
// resynchronization with the source database.
ReadInto(ctx context.Context, ch chan<- Message, state State) error

// Process decodes the logical replication messages, to call the
// various OnEvent methods. If this method returns an error, the
// entire replication loop will be restarted.
// various Events methods. Implementations of Process should exit
// gracefully when the channel is closed, this may represent a
// switch from backfilling to a streaming mode. If this method
// returns an error, the entire replication loop will be restarted.
Process(ctx context.Context, ch <-chan Message, events Events) error

// UnmarshalStamp decodes a stamp in string format to a stamp.Stamp.
UnmarshalStamp([]byte) (stamp.Stamp, error)
// ZeroStamp constructs a new, zero-valued stamp that represents
// a consistent point at the beginning of the source's history.
ZeroStamp() stamp.Stamp
}

// A Message is specific to a Dialect.
Expand Down Expand Up @@ -71,15 +88,24 @@ type Events interface {
// message is encountered, to ensure that all internal state has
// been resynchronized.
OnRollback(ctx context.Context, msg Message) error

// stop is called after any attempt to run a replication loop.
// Implementations should block until any pending mutations have
// been committed.
stop()
}

// State provides information about a replication loop.
type State interface {
// GetConsistentPoint returns the most recent consistent point that
// has been committed to the target database.
// has been committed to the target database or the value returned
// from Dialect.ZeroStamp.
GetConsistentPoint() stamp.Stamp
// GetTargetDB returns the target database name.
GetTargetDB() ident.Ident

// setConsistentPoint is called from implementations of Events.
setConsistentPoint(stamp.Stamp)
}

// OffsetStamp is a Stamp which can represent itself as an absolute
Expand All @@ -90,7 +116,7 @@ type OffsetStamp interface {
}

// TimeStamp is a Stamp which can represent itself as a time.Time. This
// is used for optional metrics reporting.
// is used to enable backfill mode and for metrics reporting.
type TimeStamp interface {
stamp.Stamp
AsTime() time.Time
Expand Down
78 changes: 78 additions & 0 deletions internal/source/logical/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package logical

import (
"context"
"fmt"
"sync"

"github.com/cockroachdb/cdc-sink/internal/target/apply/fan"
"github.com/cockroachdb/cdc-sink/internal/types"
"github.com/jackc/pgx/v4/pgxpool"
)

// Factory supports uses cases where it is desirable to have multiple,
// independent logical loops that share common resources.
type Factory struct {
appliers types.Appliers
cfg *Config
fans *fan.Fans
memo types.Memo
pool *pgxpool.Pool

mu struct {
sync.Mutex
cancels []func()
loops map[string]*Loop
}
}

// Close terminates all running loops and waits for them to shut down.
func (f *Factory) Close() {
f.mu.Lock()
defer f.mu.Unlock()
for _, cancel := range f.mu.cancels {
// These are just context-cancellations functions that return
// immediately, so we need to wait for the loops to stop below.
cancel()
}
for _, loop := range f.mu.loops {
<-loop.Stopped()
}
f.mu.cancels = nil
f.mu.loops = make(map[string]*Loop)
}

// Get constructs or retrieves the named Loop.
func (f *Factory) Get(ctx context.Context, name string, dialect Dialect) (*Loop, error) {
f.mu.Lock()
defer f.mu.Unlock()

if found, ok := f.mu.loops[name]; ok {
return found, nil
}

cfg := f.cfg.Copy()
if cfg.LoopName == "" {
cfg.LoopName = name
} else {
cfg.LoopName = fmt.Sprintf("%s-%s", cfg.LoopName, name)
}

ret, cancel, err := ProvideLoop(ctx, f.appliers, cfg, dialect, f.fans, f.memo, f.pool)
if err != nil {
return nil, err
}
f.mu.loops[name] = ret
f.mu.cancels = append(f.mu.cancels, cancel)
return ret, nil
}
Loading

0 comments on commit 6696851

Please sign in to comment.