Skip to content
Wiki Maintenance Bot edited this page Jun 17, 2024 · 51 revisions

CockroachDB to CockroachDB

The primary use case for Replicator is to write a CockroachDB changefeed into a destination database. Currently, CockroachDB, PostgreSQL and Oracle Database are valid targets for Replicator.

A thorough understanding of CockroachDB CDC features is a prerequisite for any operators deploying Replicator in a changefeed configuration.

Overview

  • A source CRDB cluster emits changes in near-real time via the enterprise CHANGEFEED feature.
  • Replicator accepts the changes via an HTTP(S) endpoint.
  • Mutations are staged in a CockroachDB cluster (this is usually also the target cluster in CockroachDB-to-CockroachDB deployments).
  • Replicator applies the changes to the target in one of several modes.
+------------------------+
|     source CRDB        |
|                        |
| CDC {source_table(s)}  |
+------------------------+
          |
          V
   http://ip:26258
+---------------------+       +----------------------+
|      Replicator     |  pgx  |     staging CRDB     |
|                     | ----> | (usually the target) |
+---------------------+       +----------------------+
          |
          V
     sql://target.db
+------------------------+
|       target           |
| {destination_table(s)} |
+------------------------+

Theory of operation

  • Replicator receives a stream of partially-ordered row mutations from one or more nodes in the source cluster.
  • The leading path segments from the URL are taken as the destination schema.
    • For a CockroachDB or PostgreSQL target, this would generally be /target_database/public, or the user defined schema in which the target tables are defined.
    • For an Oracle Database target, this would generally be the /TARGET_SCHEMA or a synonym thereof.
  • The incoming mutations are staged on a per-table basis and ordered by the updated MVCC timestamps provided by the changefeed.
  • Upon receipt of a resolved timestamp, mutations whose timestamps are less than the new, resolved timestamp are dequeued.
  • The dequeued mutations are applied to the target tables, either as UPSERT or DELETE operations (or their analogs in non-CockroachDB targets).
  • The resolved timestamp is stored for future use.

⚠️ There should be, at most, one source CHANGEFEED per target SCHEMA per staging database. That is, multiple changefeeds should not target the same instance of Replicator if they write to the same target schema. This limitation is expected to be removed with additional points of configuration to define non-overlapping table groups. It is not currently possible to detect this misconfiguration without additional metadata from the source changefeed.

Multiple instances of Replicator should be used in production scenarios and deployed behind an HTTP loadbalancer.

The behavior described above, staging and application as two phases, ensures that the target tables will be in a transactionally-consistent state with respect to the source database. This is desirable in a steady-state configuration, however it imposes limitations on the maximum throughput or transaction size that Replicator is able to achieve. In situations where a large volume or a high rate of data must be transferred, e.g.: an initial CDC CHANGEFEED backfill, the server may be started with the --immediate command-line parameter in order to apply all mutations to the target table without staging them.

Replicator relies heavily upon the delivery guarantees provided by a CockroachDB CHANGEFEED. When errors are encountered while staging or applying mutations, Replicator will return them to the incoming changefeed request. Errors will then become visible in the output of SHOW JOBS or in the administrative UI on the source cluster.

Instructions

  1. In the source cluster, choose the table(s) that you would like to replicate.
  2. In the destination cluster, re-create those tables within a single SQL database schema and match the table definitions exactly:
    • Don't create any new constraints on any destination table(s).
    • It's imperative that the columns are named exactly the same in the destination table.
  3. Create the staging database _replicator in the staging cluster. In a CockroachDB-to-CockroachDB deployment, the target cluster is generally used as the staging cluster.
    • It is recommended that you reduce the default GC time to five minutes, since these tables have a high volume of deletes.
    • ALTER DATABASE _replicator CONFIGURE ZONE USING gc.ttlseconds=300
  4. Start Replicator either on
    • all nodes of the destination cluster
    • one or more servers that can have a low latency connection to the destination cluster
  5. Set the cluster setting of the source cluster to enable range feeds: SET CLUSTER SETTING kv.rangefeed.enabled = true
  6. Once it starts up, enable a cdc feed from the source cluster
    • CREATE CHANGEFEED FOR TABLE [source_table] INTO 'webhook-https://[replicator-host:port]/target_database/target_schema?insecure_tls_skip_verify=true' WITH updated, resolved='1s', min_checkpoint_frequency='1s', webhook_sink_config='{"Flush":{"Bytes":1048576,"Frequency":"1s"}}'
    • The target_database path element is the name passed to the CREATE DATABASE command.
    • The target_schema element is the name of the user-defined schema in the target database. By default, every SQL DATABASE has a SCHEMA whose name is "public".
      • Non-CockroachDB targets may have only a single-level namespace. In that case, the path would simply be /target_schema.
    • be sure to always use the options updated, resolved, min_checkpoint_frequency as these are required for timely replication.
    • The protect_data_from_gc_on_pause option is not required, but can be used in situations where a Replicator instance may be unavailable for periods longer than the source's garbage-collection window (25 hours by default).

Important Note

If you're using the webhook sink from CockroachDB version 23.1.x, be sure to enable the changefeed.new_webhook_sink_enabled cluster setting for significantly improved throughput. See the webhook sink docs for more details. This setting should not be required for version 23.2 and beyond.

Starting Changefeed Replication

Usage:
  replicator start [flags]

Flags:
      --applyTimeout duration          the maximum amount of time to wait for an update to be applied (default 30s)
      --assumeIdempotent               disable the extra staging table queries that debounce non-idempotent redelivery in changefeeds
      --bestEffortOnly                 eventually-consistent mode; useful for high throughput, skew-tolerant schemas with FKs
      --bestEffortWindow duration      use an eventually-consistent mode for initial backfill or when replication is behind; 0 to disable (default 1h0m0s)
      --bindAddr string                the network address to bind to (default ":26258")
      --disableAuthentication          disable authentication of incoming Replicator requests; not recommended for production.
      --discard                        (dangerous) discard all incoming HTTP requests; useful for changefeed throughput testing
      --discardDelay duration          adds additional delay in discard mode; useful for gauging the impact of changefeed RTT
      --dlqTableName ident             the name of a table in the target schema for storing dead-letter entries (default replicator_dlq)
      --flushPeriod duration           flush queued mutations after this duration (default 1s)
      --flushSize int                  ideal batch size to determine when to flush mutations (default 1000)
      --httpResponseTimeout duration   the maximum amount of time to allow an HTTP handler to execute for (default 2m0s)
      --immediate                      bypass staging tables and write directly to target; recommended only for KV-style workloads with no FKs
      --metricsAddr string             a host:port on which to serve metrics and diagnostics
      --ndjsonBufferSize int           the maximum amount of data to buffer while reading a single line of ndjson input; increase when source cluster has large blob values (default 65536)
      --parallelism int                the number of concurrent database transactions to use (default 16)
      --quiescentPeriod duration       how often to retry deferred mutations (default 10s)
      --retireOffset duration          delay removal of applied mutations (default 24h0m0s)
      --scanSize int                   the number of rows to retrieve from staging (default 10000)
      --stagingConn string             the staging database's connection string
      --stagingCreateSchema            automatically create the staging schema if it does not exist
      --stagingIdleTime duration       maximum lifetime of an idle connection (default 1m0s)
      --stagingJitterTime duration     the time over which to jitter database pool disconnections (default 15s)
      --stagingMaxLifetime duration    the maximum lifetime of a database connection (default 5m0s)
      --stagingMaxPoolSize int         the maximum number of staging database connections (default 128)
      --stagingSchema atom             a SQL database schema to store metadata in (default _replicator.public)
      --targetConn string              the target database's connection string
      --targetIdleTime duration        maximum lifetime of an idle connection (default 1m0s)
      --targetJitterTime duration      the time over which to jitter database pool disconnections (default 15s)
      --targetMaxLifetime duration     the maximum lifetime of a database connection (default 5m0s)
      --targetMaxPoolSize int          the maximum number of target database connections (default 128)
      --targetStatementCacheSize int   the maximum number of prepared statements to retain (default 128)
      --timestampLimit int             the maximum number of source timestamps to coalesce into a target transaction (default 1000)
      --tlsCertificate string          a path to a PEM-encoded TLS certificate chain
      --tlsPrivateKey string           a path to a PEM-encoded TLS private key
      --tlsSelfSigned                  if true, generate a self-signed TLS certificate valid for 'localhost'
      --userscript string              the path to a configuration script, see userscript subcommand

Global Flags:
      --gracePeriod duration    allow background processes to exit (default 30s)
      --logDestination string   write logs to a file, instead of stdout
      --logFormat string        choose log output format [ fluent, text ] (default "text")
  -v, --verbose count           increase logging verbosity to debug; repeat for trace

Example

Start by installing Replicator, via container or by downloading a binary.

# source CRDB is single node
cockroach start-single-node --listen-addr :30000 --http-addr :30001 --store cockroach-data/30000 --insecure --background

# target CRDB is single node as well
cockroach start-single-node --listen-addr :30002 --http-addr :30003 --store cockroach-data/30002 --insecure --background

# source ycsb.usertable is populated with 10 rows
cockroach workload init ycsb 'postgresql://root@localhost:30000/ycsb?sslmode=disable' --families=false --insert-count=10

# target ycsb.usertable is empty
cockroach workload init ycsb 'postgresql://root@localhost:30002/ycsb?sslmode=disable' --families=false --insert-count=0

# source has rows, target is empty
cockroach sql --port 30000 --insecure -e "SELECT ycsb_key FROM ycsb.usertable ORDER BY ycsb_key"
cockroach sql --port 30002 --insecure -e "SELECT ycsb_key FROM ycsb.usertable ORDER BY ycsb_key"

# create staging database for Replicator
cockroach sql --port 30002 --insecure -e "CREATE DATABASE _replicator"
# Replicator started as a background task. Remove the tls flag for CockroachDB <= v21.1
Replicator start --bindAddr :30004 --tlsSelfSigned --disableAuthentication --targetConn 'postgresql://root@localhost:30002/?sslmode=disable' &

# start the CDC that will send across the initial data snapshot
# Versions of CRDB before v21.2 should use the experimental-http:// URL scheme
cockroach sql --insecure --port 30000 <<EOF
-- add enterprise license
-- SET CLUSTER SETTING cluster.organization = 'Acme Company';
-- SET CLUSTER SETTING enterprise.license = 'xxxxxxxxxxxx';
SET CLUSTER SETTING kv.rangefeed.enabled = true;
CREATE CHANGEFEED FOR TABLE YCSB.USERTABLE
  INTO 'webhook-https://127.0.0.1:30004/ycsb/public?insecure_tls_skip_verify=true'
  WITH updated, resolved='1s', min_checkpoint_frequency='1s',
       webhook_sink_config='{"Flush":{"Bytes":1048576,"Frequency":"1s"}}';
EOF

# source has rows, target is the same as the source
cockroach sql --port 30000 --insecure -e "SELECT ycsb_key FROM ycsb.usertable ORDER BY ycsb_key"
cockroach sql --port 30002 --insecure -e "SELECT ycsb_key FROM ycsb.usertable ORDER BY ycsb_key"

# start YCSB workload and the incremental updates happens automagically
# note that these are just updates to the already existing rows
cockroach workload run ycsb 'postgresql://root@localhost:30000/ycsb?sslmode=disable' --families=false --insert-count=100 --concurrency=1 &

# source updates are applied to the target
# you should see changes in the source be propagated to the destination
# while running, you can see the mvcc timestamps increasing (but not match as these are different transactions)
cockroach sql --port 30000 --insecure -e "SELECT ycsb_key, LEFT(field0,10) AS field0, crdb_internal_mvcc_timestamp FROM ycsb.usertable ORDER BY ycsb_key"
cockroach sql --port 30002 --insecure -e "SELECT ycsb_key, LEFT(field0,10) AS field0, crdb_internal_mvcc_timestamp FROM ycsb.usertable ORDER BY ycsb_key"

Operating modes

Transactional mode

The default mode of operation provides transactionally-consistent logical replication of data from the source to the target. Incoming mutations are written to a staging table and applied upon receipt of a resolved-timestamp (checkpoint) message from the changefeed.

The transactional mode will attempt to apply source transactions concurrently, provided that they do not modify any overlapping rows in the target tables. This preserves the serializable guarantees provided by CockroachDB. The number of concurrent attempts can be controlled via the --parallelism flag. Setting this to a value of 1 will enforce a strictly serial mode of operation.

Best-effort mode

If replication delay exceeds the duration specified by the --bestEffortWindow flag or the --bestEffortOnly flag is present, Replicator will operate in a reduced-consistency mode to improve throughput. In this mode, incoming mutations are applied immediately to a target table. In cases where a mutation cannot be applied (e.g.: foreign key constraint violations), it will be written to the staging table. Staged mutations are applied incrementally, on a best-effort basis. When replication has caught up, Replicator will switch back to its default mode of operation.

Immediate mode

Immediate mode writes incoming mutations to the target schema as soon as they are received, instead of waiting for a resolved timestamp. Transaction boundaries from the source database will not be preserved, but overall load on the destination database will be reduced. This sacrifices transaction atomicity for performance, and may be appropriate in eventually-consistency use cases.

Immediate mode is enabled by passing the --immediate flag.

Change Data Capture Queries

In CockroachDB, starting with release 23.1, users can create CDC feeds using Change Data Capture Queries. The expression syntax provides a way to select columns and apply filters to further restrict or transform the data emitted by the feed. To work with Replicator, the feed must provide the target table by adding the table name to the sink URL. In addition, to emit all the values from a deleted row the changefeed requires the use of the envelope=wrapped, format=json, and diff options:

CREATE CHANGEFEED
INTO '{scheme}://{host}:{port}/{target_database}/{target_schema}/{target_table}?{query_parameters}'
WITH envelope='wrapped', format='json', diff [additional options]
AS SELECT
  projection
FROM
  table
[WHERE predicate];

For example, to define a change feed for a webhook sink:

CREATE CHANGEFEED
INTO 'webhook-https://127.0.0.1:30004/ycsb/public/usertable?insecure_tls_skip_verify=true'
WITH envelope='wrapped', format='json', diff, updated, resolved='1s', min_checkpoint_frequency='1s',
     webhook_sink_config='{"Flush":{"Bytes":1048576,"Frequency":"1s"}}'
AS SELECT
     *
FROM
     ycsb.usertable

Limitations

Refer to CockroachDB known limitations for general limitations on the source changefeeds.

Foreign keys

  • Foreign keys are supported in transactionally-consistent modes, but do have a minor throughput penalty.
  • Replicator will order updates to referent tables before referring tables.
  • Cyclical table dependency graphs are unsupported (and would be difficult to use in CockroachDB without deferrable constraints).
    • Self-referential tables are supported, provided that a parent row in created in a transaction separate from any child rows.

JSONB columns

Due to ambiguity in how JSONB values are encoded by CockroachDB, it is not possible to distinguish the SQL NULL value from a JSONB null token. We do not recommend the use of nullable JSONB column if the null JSONB token may be used as a value. Instead, it is preferable to declare the destination column as JSONB NOT NULL and use a substutite expression to replace a SQL NULL value with the JSONB null token.

Schema Changes

  • Schema changes are not automatically propagated from the source to the destination.
  • Replicator can support live schema changes, but the destination schema must be changed in a coordinated fashion with the source.
  • Adding columns or tables is possible if they are added first to the destination. New columns must either be nullable or have a DEFAULT value, so that the UPSERT commands used by Replicator need not reference them.
  • Removing columns or tables should be performed on the source cluster first. The ignore column behavior may also be used.
  • Adding or removing secondary indexes is generally safe; Replicator does not make use of secondary indexes.
  • Different zone configs are allowed.
  • Adding new computed columns, that cannot reject any row, should work.