Skip to content

Commit

Permalink
sql: add ttl_disable_changefeed_replication table storage parameter
Browse files Browse the repository at this point in the history
This patch adds a new `ttl_disable_changefeed_replication` table
storage parameter, which can be used to disable changefeed replication
for row-level TTL on a per-table basis.

Release note (sql change): A new `ttl_disable_changefeed_replication`
table storage parameter has been added and can be used to disable
changefeed replication for row-level TTL on a per-table basis.
  • Loading branch information
andyyang890 committed Feb 24, 2024
1 parent cd723f0 commit b587822
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 85 deletions.
189 changes: 112 additions & 77 deletions pkg/cmd/roachtest/tests/cdc_filtering.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,22 @@ func registerCDCFiltering(r registry.Registry) {
Run: runCDCSessionFiltering,
})
r.Add(registry.TestSpec{
Name: "cdc/filtering/ttl",
Name: "cdc/filtering/ttl/cluster",
Owner: registry.OwnerCDC,
Cluster: r.MakeClusterSpec(3),
CompatibleClouds: registry.AllClouds,
Suites: registry.Suites(registry.Nightly),
RequiresLicense: true,
Run: runCDCTTLFiltering,
Run: runCDCTTLFiltering(ttlFilteringClusterSetting),
})
r.Add(registry.TestSpec{
Name: "cdc/filtering/ttl/table",
Owner: registry.OwnerCDC,
Cluster: r.MakeClusterSpec(3),
CompatibleClouds: registry.AllClouds,
Suites: registry.Suites(registry.Nightly),
RequiresLicense: true,
Run: runCDCTTLFiltering(ttlFilteringTableStorageParam),
})
}

Expand Down Expand Up @@ -283,98 +292,124 @@ func checkCDCEvents[S any](
return nil
}

func runCDCTTLFiltering(ctx context.Context, t test.Test, c cluster.Cluster) {
t.Status("starting cluster")
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings())
conn := c.Conn(ctx, t.L(), 1)
defer conn.Close()
// ttlFilteringType is the type of TTL filtering to use in the test function
// returned by runCDCTTLFiltering.
type ttlFilteringType bool

const (
// ttlFilteringClusterSetting denotes TTL filtering enabled via setting the
// sql.ttl.changefeed_replication.disabled cluster setting.
ttlFilteringClusterSetting = false
// ttlFilteringTableStorageParam denotes TTL filtering enabled via setting the
// ttl_disable_changefeed_replication storage parameter on the table.
ttlFilteringTableStorageParam = true
)

// kv.rangefeed.enabled is required for changefeeds to run
_, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
require.NoError(t, err)
func runCDCTTLFiltering(
filteringType ttlFilteringType,
) func(ctx context.Context, t test.Test, c cluster.Cluster) {
return func(ctx context.Context, t test.Test, c cluster.Cluster) {
t.Status("starting cluster")
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings())
conn := c.Conn(ctx, t.L(), 1)
defer conn.Close()

// kv.rangefeed.enabled is required for changefeeds to run
_, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
require.NoError(t, err)

t.Status("creating table with TTL")
_, err = conn.ExecContext(ctx, `CREATE TABLE events (
t.Status("creating table with TTL")
_, err = conn.ExecContext(ctx, `CREATE TABLE events (
id STRING PRIMARY KEY,
expired_at TIMESTAMPTZ
) WITH (ttl_expiration_expression = 'expired_at', ttl_job_cron = '* * * * *')`)
require.NoError(t, err)
require.NoError(t, err)

t.Status("creating changefeed")
var jobID int
err = conn.QueryRowContext(ctx, `CREATE CHANGEFEED FOR TABLE events
t.Status("creating changefeed")
var jobID int
err = conn.QueryRowContext(ctx, `CREATE CHANGEFEED FOR TABLE events
INTO 'nodelocal://1/events'
WITH diff, updated, min_checkpoint_frequency = '1s'`).Scan(&jobID)
require.NoError(t, err)

const (
expiredTime = "2000-01-01"
notExpiredTime = "2200-01-01"
)
require.NoError(t, err)

t.Status("insert initial table data")
_, err = conn.Exec(`INSERT INTO events VALUES ('A', $1), ('B', $2)`, expiredTime, notExpiredTime)
require.NoError(t, err)
const (
expiredTime = "2000-01-01"
notExpiredTime = "2200-01-01"
)

t.Status("wait for TTL to run and delete rows")
err = waitForTTL(ctx, conn, "defaultdb.public.events", timeutil.Now())
require.NoError(t, err)
t.Status("insert initial table data")
_, err = conn.Exec(`INSERT INTO events VALUES ('A', $1), ('B', $2)`, expiredTime, notExpiredTime)
require.NoError(t, err)

t.Status("check that rows are deleted")
var countA int
err = conn.QueryRow(`SELECT count(*) FROM events WHERE id = 'A'`).Scan(&countA)
require.NoError(t, err)
require.Equal(t, countA, 0)
t.Status("wait for TTL to run and delete rows")
err = waitForTTL(ctx, conn, "defaultdb.public.events", timeutil.Now())
require.NoError(t, err)

t.Status("set sql.ttl.changefeed_replication.disabled")
_, err = conn.ExecContext(ctx, `SET CLUSTER SETTING sql.ttl.changefeed_replication.disabled = true`)
require.NoError(t, err)
t.Status("check that rows are deleted")
var countA int
err = conn.QueryRow(`SELECT count(*) FROM events WHERE id = 'A'`).Scan(&countA)
require.NoError(t, err)
require.Equal(t, countA, 0)

t.Status("update remaining rows to be expired")
_, err = conn.Exec(`UPDATE events SET expired_at = $1 WHERE id = 'B'`, expiredTime)
require.NoError(t, err)
switch filteringType {
case ttlFilteringClusterSetting:
t.Status("set sql.ttl.changefeed_replication.disabled")
_, err := conn.ExecContext(ctx, `SET CLUSTER SETTING sql.ttl.changefeed_replication.disabled = true`)
require.NoError(t, err)
case ttlFilteringTableStorageParam:
t.Status("set ttl_disable_changefeed_replication")
_, err := conn.ExecContext(ctx, `ALTER TABLE events SET (ttl_disable_changefeed_replication = true)`)
require.NoError(t, err)
default:
panic("unknown TTL filtering type")
}

t.Status("wait for TTL to run and delete rows")
err = waitForTTL(ctx, conn, "defaultdb.public.events", timeutil.Now().Add(time.Minute))
require.NoError(t, err)
t.Status("update remaining rows to be expired")
_, err = conn.Exec(`UPDATE events SET expired_at = $1 WHERE id = 'B'`, expiredTime)
require.NoError(t, err)

t.Status("check that rows are deleted")
var countB int
err = conn.QueryRow(`SELECT count(*) FROM events WHERE id = 'B'`).Scan(&countB)
require.NoError(t, err)
require.Equal(t, countB, 0)
t.Status("wait for TTL to run and delete rows")
err = waitForTTL(ctx, conn, "defaultdb.public.events", timeutil.Now().Add(time.Minute))
require.NoError(t, err)

expectedEvents := []string{
// initial
"A@2000-01-01T00:00:00Z", "B@2200-01-01T00:00:00Z",
// TTL deletes A
"<deleted> (before: A@2000-01-01T00:00:00Z)",
// update B to be expired
"B@2000-01-01T00:00:00Z (before: B@2200-01-01T00:00:00Z)",
// TTL deletes B (no events)
}
type state struct {
ID string `json:"id"`
ExpiredAt string `json:"expired_at"`
t.Status("check that rows are deleted")
var countB int
err = conn.QueryRow(`SELECT count(*) FROM events WHERE id = 'B'`).Scan(&countB)
require.NoError(t, err)
require.Equal(t, countB, 0)

expectedEvents := []string{
// initial
"A@2000-01-01T00:00:00Z", "B@2200-01-01T00:00:00Z",
// TTL deletes A
"<deleted> (before: A@2000-01-01T00:00:00Z)",
// update B to be expired
"B@2000-01-01T00:00:00Z (before: B@2200-01-01T00:00:00Z)",
// TTL deletes B (no events)
}
type state struct {
ID string `json:"id"`
ExpiredAt string `json:"expired_at"`
}
err = checkCDCEvents[state](ctx, t, c, conn, jobID, "events",
// Produce a canonical format that we can assert on. The format is of the
// form: id@exp_at (before: id@exp_at)[, id@exp_at (before: id@exp_at), ...]
func(before *state, after *state) string {
var s string
if after == nil {
s += "<deleted>"
} else {
s += fmt.Sprintf("%s@%s", after.ID, after.ExpiredAt)
}
if before != nil {
s += fmt.Sprintf(" (before: %s@%s)", before.ID, before.ExpiredAt)
}
return s
},
expectedEvents,
)
require.NoError(t, err)
}
err = checkCDCEvents[state](ctx, t, c, conn, jobID, "events",
// Produce a canonical format that we can assert on. The format is of the
// form: id@exp_at (before: id@exp_at)[, id@exp_at (before: id@exp_at), ...]
func(before *state, after *state) string {
var s string
if after == nil {
s += "<deleted>"
} else {
s += fmt.Sprintf("%s@%s", after.ID, after.ExpiredAt)
}
if before != nil {
s += fmt.Sprintf(" (before: %s@%s)", before.ID, before.ExpiredAt)
}
return s
},
expectedEvents,
)
require.NoError(t, err)
}

// waitForTTL waits until the row-level TTL job for a given table has run
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/catalog/catpb/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ message RowLevelTTL {
optional string expiration_expr = 11 [(gogoproto.nullable)=false, (gogoproto.casttype)="Expression"];
// SelectRateLimit is the maximum amount of rows to select per second.
optional int64 select_rate_limit = 12 [(gogoproto.nullable)=false];
// DisableChangefeedReplication disables changefeed replication for the
// deletes performed by the TTL job.
optional bool disable_changefeed_replication = 13 [(gogoproto.nullable) = false];
}

// AutoStatsSettings represents settings related to automatic statistics
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -2419,6 +2419,9 @@ func (desc *wrapper) GetStorageParams(spaceBetweenEqual bool) []string {
if labelMetrics := ttl.LabelMetrics; labelMetrics {
appendStorageParam(`ttl_label_metrics`, fmt.Sprintf(`%t`, labelMetrics))
}
if ttl.DisableChangefeedReplication {
appendStorageParam(`ttl_disable_changefeed_replication`, fmt.Sprintf("%t", ttl.DisableChangefeedReplication))
}
}
if exclude := desc.GetExcludeDataFromBackup(); exclude {
appendStorageParam(`exclude_data_from_backup`, `true`)
Expand Down
17 changes: 11 additions & 6 deletions pkg/sql/logictest/testdata/logic_test/row_level_ttl
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ CREATE TABLE tbl_reloptions (
ttl_delete_rate_limit = 40,
ttl_pause = true,
ttl_row_stats_poll_interval = '1 minute',
ttl_label_metrics = true
ttl_label_metrics = true,
ttl_disable_changefeed_replication = true
)

query T rowsort
Expand All @@ -248,6 +249,7 @@ ttl_delete_rate_limit=40
ttl_pause=true
ttl_row_stats_poll_interval='1m0s'
ttl_label_metrics=true
ttl_disable_changefeed_replication=true

subtest end

Expand Down Expand Up @@ -775,7 +777,8 @@ CREATE TABLE tbl_set_ttl_params (
ttl_delete_rate_limit = 40,
ttl_pause = true,
ttl_row_stats_poll_interval = '1 minute',
ttl_label_metrics = true
ttl_label_metrics = true,
ttl_disable_changefeed_replication = true
)

query T
Expand All @@ -785,7 +788,7 @@ CREATE TABLE public.tbl_set_ttl_params (
id INT8 NOT NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_set_ttl_params_pkey PRIMARY KEY (id ASC)
) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 10, ttl_delete_batch_size = 20, ttl_select_rate_limit = 30, ttl_delete_rate_limit = 40, ttl_pause = true, ttl_row_stats_poll_interval = '1m0s', ttl_label_metrics = true)
) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 10, ttl_delete_batch_size = 20, ttl_select_rate_limit = 30, ttl_delete_rate_limit = 40, ttl_pause = true, ttl_row_stats_poll_interval = '1m0s', ttl_label_metrics = true, ttl_disable_changefeed_replication = true)

statement ok
ALTER TABLE tbl_set_ttl_params SET (ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_select_rate_limit = 130, ttl_delete_rate_limit = 140, ttl_row_stats_poll_interval = '2m0s')
Expand All @@ -797,7 +800,7 @@ CREATE TABLE public.tbl_set_ttl_params (
id INT8 NOT NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_set_ttl_params_pkey PRIMARY KEY (id ASC)
) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_select_rate_limit = 130, ttl_delete_rate_limit = 140, ttl_pause = true, ttl_row_stats_poll_interval = '2m0s', ttl_label_metrics = true)
) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_select_rate_limit = 130, ttl_delete_rate_limit = 140, ttl_pause = true, ttl_row_stats_poll_interval = '2m0s', ttl_label_metrics = true, ttl_disable_changefeed_replication = true)

statement ok
ALTER TABLE tbl_set_ttl_params RESET (
Expand All @@ -806,7 +809,9 @@ ALTER TABLE tbl_set_ttl_params RESET (
ttl_select_rate_limit,
ttl_delete_rate_limit,
ttl_pause,
ttl_row_stats_poll_interval
ttl_row_stats_poll_interval,
ttl_label_metrics,
ttl_disable_changefeed_replication
)

query T
Expand All @@ -816,7 +821,7 @@ CREATE TABLE public.tbl_set_ttl_params (
id INT8 NOT NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_set_ttl_params_pkey PRIMARY KEY (id ASC)
) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_label_metrics = true)
) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL)

subtest end

Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/storageparam/tablestorageparam/table_storage_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,23 @@ var tableParams = map[string]tableParam{
return nil
},
},
`ttl_disable_changefeed_replication`: {
onSet: func(ctx context.Context, po *Setter, semaCtx *tree.SemaContext, evalCtx *eval.Context, key string, datum tree.Datum) error {
b, err := boolFromDatum(ctx, evalCtx, key, datum)
if err != nil {
return err
}
rowLevelTTL := po.getOrCreateRowLevelTTL()
rowLevelTTL.DisableChangefeedReplication = b
return nil
},
onReset: func(ctx context.Context, po *Setter, evalCtx *eval.Context, key string) error {
if po.hasRowLevelTTL() {
po.UpdatedRowLevelTTL.DisableChangefeedReplication = false
}
return nil
},
},
`exclude_data_from_backup`: {
onSet: func(ctx context.Context, po *Setter, semaCtx *tree.SemaContext,
evalCtx *eval.Context, key string, datum tree.Datum) error {
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/ttl/ttlbase/ttl_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,12 @@ func CheckJobEnabled(settingsValues *settings.Values) error {

// GetChangefeedReplicationDisabled returns whether changefeed replication
// should be disabled for this job based on the relevant cluster setting.
func GetChangefeedReplicationDisabled(settingsValues *settings.Values) bool {
func GetChangefeedReplicationDisabled(
settingsValues *settings.Values, ttl *catpb.RowLevelTTL,
) bool {
if ttl.DisableChangefeedReplication {
return true
}
return changefeedReplicationDisabled.Get(settingsValues)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) (re
deleteBatchSize := ttlbase.GetDeleteBatchSize(settingsValues, rowLevelTTL)
selectRateLimit := ttlbase.GetSelectRateLimit(settingsValues, rowLevelTTL)
deleteRateLimit := ttlbase.GetDeleteRateLimit(settingsValues, rowLevelTTL)
disableChangefeedReplication := ttlbase.GetChangefeedReplicationDisabled(settingsValues)
disableChangefeedReplication := ttlbase.GetChangefeedReplicationDisabled(settingsValues, rowLevelTTL)
newTTLSpec := func(spans []roachpb.Span) *execinfrapb.TTLSpec {
return &execinfrapb.TTLSpec{
JobID: jobID,
Expand Down

0 comments on commit b587822

Please sign in to comment.