Skip to content

Commit

Permalink
Merge #109835
Browse files Browse the repository at this point in the history
109835: changefeedccl: add changefeed.lagging_ranges metric r=miretskiy a=jayshrivastava

This change adds the `changefeed.lagging_ranges` metric which can be used to track
ranges which are behind. This metric is calculated based on a new changefeed option
`lagging_ranges_threshold` which is the amount of time that a range
checkpoint needs to be in the past to be considered lagging. This defaults to 3 minutes.
This change also adds the changefeed option `lagging_ranges_polling_interval` which is
the polling rate at which a rangefeed will poll for lagging ranges and update the metric.
This defaults to 1 minute.

Sometimes a range may not have any checkpoints for a while because the start time
may be far in the past (this causes a catchup scan during which no checkpoints are emitted).
In this case, the range is considered to the lagging if the created timestamp of the
rangefeed is older than `changefeed.lagging_ranges_threshold`. Note that this means that
any changefeed which starts with an initial scan containing a significant amount of data will
likely indicate nonzero `changefeed.lagging_ranges` until the initial scan is complete. This
is intentional.

Release note (ops change): A new metric `changefeed.lagging_ranges` is added to show the number of
ranges which are behind in changefeeds. This metric can be used with the `metrics_label` changefeed
option. A changefeed option `lagging_ranges_threshold` is added which is the amount of
time a range needs to be behind to be considered lagging. By default this is 3 minutes. There is also
a new option `lagging_ranges_polling_interval` which controls how often the lagging ranges
calculation is done. This setting defaults to polling every 1 minute. 

Note that polling adds latency to the metric being updated. For example, if a range falls behind
by 3 minutes, the metric may not update until an additional minute afterwards.

Also note that ranges undergoing an initial scan for longer than the threshold are considered to be
lagging. Starting a changefeed with an initial scan on a large table will likely increment the metric
for each range in the table. However, as ranges complete the initial scan, the number of ranges will
decrease.

Epic: None

Co-authored-by: Shiranka Miskin <shiranka.miskin@gmail.com>
  • Loading branch information
craig[bot] and samiskin committed Sep 6, 2023
2 parents bd8ee15 + b5ec7df commit 4c90f3b
Show file tree
Hide file tree
Showing 11 changed files with 545 additions and 104 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ go_test(
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/protectedts",
"//pkg/roachpb",
Expand Down
72 changes: 48 additions & 24 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
pool = ca.knobs.MemMonitor
}
limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.flowCtx.Cfg.Settings.SV)
ca.eventProducer, ca.kvFeedDoneCh, ca.errCh, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed, pool, limit)
ca.eventProducer, ca.kvFeedDoneCh, ca.errCh, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed, pool, limit, opts)
if err != nil {
ca.MoveToDraining(err)
ca.cancel()
Expand Down Expand Up @@ -361,6 +361,7 @@ func (ca *changeAggregator) startKVFeed(
config ChangefeedConfig,
parentMemMon *mon.BytesMonitor,
memLimit int64,
opts changefeedbase.StatementOptions,
) (kvevent.Reader, chan struct{}, chan error, error) {
cfg := ca.flowCtx.Cfg
kvFeedMemMon := mon.NewMonitorInheritWithLimit("kvFeed", memLimit, parentMemMon)
Expand All @@ -371,7 +372,7 @@ func (ca *changeAggregator) startKVFeed(

// KVFeed takes ownership of the kvevent.Writer portion of the buffer, while
// we return the kvevent.Reader part to the caller.
kvfeedCfg, err := ca.makeKVFeedCfg(ctx, config, spans, buf, initialHighWater, needsInitialScan, kvFeedMemMon)
kvfeedCfg, err := ca.makeKVFeedCfg(ctx, config, spans, buf, initialHighWater, needsInitialScan, kvFeedMemMon, opts)
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -414,6 +415,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
initialHighWater hlc.Timestamp,
needsInitialScan bool,
memMon *mon.BytesMonitor,
opts changefeedbase.StatementOptions,
) (kvfeed.Config, error) {
schemaChange, err := config.Opts.GetSchemaChangeHandlingOptions()
if err != nil {
Expand All @@ -432,29 +434,51 @@ func (ca *changeAggregator) makeKVFeedCfg(
initialHighWater, &ca.metrics.SchemaFeedMetrics, config.Opts.GetCanHandle())
}

monitoringCfg, err := makeKVFeedMonitoringCfg(ca.sliMetrics, opts)
if err != nil {
return kvfeed.Config{}, err
}

return kvfeed.Config{
Writer: buf,
Settings: cfg.Settings,
DB: cfg.DB.KV(),
Codec: cfg.Codec,
Clock: cfg.DB.KV().Clock(),
Spans: spans,
CheckpointSpans: ca.spec.Checkpoint.Spans,
CheckpointTimestamp: ca.spec.Checkpoint.Timestamp,
Targets: AllTargets(ca.spec.Feed),
Metrics: &ca.metrics.KVFeedMetrics,
OnBackfillCallback: ca.sliMetrics.getBackfillCallback(),
OnBackfillRangeCallback: ca.sliMetrics.getBackfillRangeCallback(),
MM: memMon,
InitialHighWater: initialHighWater,
EndTime: config.EndTime,
WithDiff: filters.WithDiff,
NeedsInitialScan: needsInitialScan,
SchemaChangeEvents: schemaChange.EventClass,
SchemaChangePolicy: schemaChange.Policy,
SchemaFeed: sf,
Knobs: ca.knobs.FeedKnobs,
UseMux: changefeedbase.UseMuxRangeFeed.Get(&cfg.Settings.SV),
Writer: buf,
Settings: cfg.Settings,
DB: cfg.DB.KV(),
Codec: cfg.Codec,
Clock: cfg.DB.KV().Clock(),
Spans: spans,
CheckpointSpans: ca.spec.Checkpoint.Spans,
CheckpointTimestamp: ca.spec.Checkpoint.Timestamp,
Targets: AllTargets(ca.spec.Feed),
Metrics: &ca.metrics.KVFeedMetrics,
MM: memMon,
InitialHighWater: initialHighWater,
EndTime: config.EndTime,
WithDiff: filters.WithDiff,
NeedsInitialScan: needsInitialScan,
SchemaChangeEvents: schemaChange.EventClass,
SchemaChangePolicy: schemaChange.Policy,
SchemaFeed: sf,
Knobs: ca.knobs.FeedKnobs,
UseMux: changefeedbase.UseMuxRangeFeed.Get(&cfg.Settings.SV),
MonitoringCfg: monitoringCfg,
}, nil
}

func makeKVFeedMonitoringCfg(
sliMetrics *sliMetrics, opts changefeedbase.StatementOptions,
) (kvfeed.MonitoringConfig, error) {
laggingRangesThreshold, laggingRangesInterval, err := opts.GetLaggingRangesConfig()
if err != nil {
return kvfeed.MonitoringConfig{}, err
}

return kvfeed.MonitoringConfig{
LaggingRangesCallback: sliMetrics.getLaggingRangesCallback(),
LaggingRangesThreshold: laggingRangesThreshold,
LaggingRangesPollingInterval: laggingRangesInterval,

OnBackfillCallback: sliMetrics.getBackfillCallback(),
OnBackfillRangeCallback: sliMetrics.getBackfillRangeCallback(),
}, nil
}

Expand Down
118 changes: 118 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -1226,6 +1228,122 @@ func TestChangefeedInitialScan(t *testing.T) {
cdcTest(t, testFn)
}

// TestChangefeedLaggingRangesMetrics tests the behavior of the
// changefeed.lagging_ranges metric.
func TestChangefeedLaggingRangesMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

// Ensure a fast closed timestamp interval so ranges can catch up fast.
kvserver.RangeFeedRefreshInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond)
closedts.SideTransportCloseInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond)
closedts.TargetDuration.Override(
context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond)

skipMu := syncutil.Mutex{}
skippedRanges := map[string]struct{}{}
numRanges := 10
numRangesToSkip := int64(4)
var stopSkip atomic.Bool
// `shouldSkip` continuously skips checkpoints for the first `numRangesToSkip` ranges it sees.
// skipping is disabled by setting `stopSkip` to true.
shouldSkip := func(event *kvpb.RangeFeedEvent) bool {
if stopSkip.Load() {
return false
}
switch event.GetValue().(type) {
case *kvpb.RangeFeedCheckpoint:
sp := event.Checkpoint.Span
skipMu.Lock()
defer skipMu.Unlock()
if _, ok := skippedRanges[sp.String()]; ok || int64(len(skippedRanges)) < numRangesToSkip {
skippedRanges[sp.String()] = struct{}{}
return true
}
}
return false
}

knobs := s.TestingKnobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs)

knobs.FeedKnobs.RangefeedOptions = append(knobs.FeedKnobs.RangefeedOptions, kvcoord.TestingWithOnRangefeedEvent(
func(ctx context.Context, s roachpb.Span, _ int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) {
return shouldSkip(event), nil
}),
)

registry := s.Server.JobRegistry().(*jobs.Registry)
sli1, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("t1")
require.NoError(t, err)
laggingRangesTier1 := sli1.LaggingRanges
sli2, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("t2")
require.NoError(t, err)
laggingRangesTier2 := sli2.LaggingRanges

assertLaggingRanges := func(tier string, expected int64) {
testutils.SucceedsWithin(t, func() error {
var laggingRangesObserved int64
if tier == "t1" {
laggingRangesObserved = laggingRangesTier1.Value()
} else {
laggingRangesObserved = laggingRangesTier2.Value()
}
if laggingRangesObserved != expected {
return fmt.Errorf("expected %d lagging ranges, but found %d", expected, laggingRangesObserved)
}
return nil
}, 10*time.Second)
}

sqlDB.Exec(t, fmt.Sprintf(`
CREATE TABLE foo (key INT PRIMARY KEY);
INSERT INTO foo (key) SELECT * FROM generate_series(1, %d);
ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, %d, 1));
`, numRanges, numRanges-1))
sqlDB.CheckQueryResults(t, `SELECT count(*) FROM [SHOW RANGES FROM TABLE foo]`,
[][]string{{fmt.Sprint(numRanges)}},
)

const laggingRangesOpts = `lagging_ranges_threshold="250ms", lagging_ranges_polling_interval="25ms"`
feed1Tier1 := feed(t, f,
fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t1", %s`, laggingRangesOpts))
feed2Tier1 := feed(t, f,
fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t1", %s`, laggingRangesOpts))
feed3Tier2 := feed(t, f,
fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t2", %s`, laggingRangesOpts))

assertLaggingRanges("t1", numRangesToSkip*2)
assertLaggingRanges("t2", numRangesToSkip)

stopSkip.Store(true)
assertLaggingRanges("t1", 0)
assertLaggingRanges("t2", 0)

stopSkip.Store(false)
assertLaggingRanges("t1", numRangesToSkip*2)
assertLaggingRanges("t2", numRangesToSkip)

require.NoError(t, feed1Tier1.Close())
assertLaggingRanges("t1", numRangesToSkip)
assertLaggingRanges("t2", numRangesToSkip)

require.NoError(t, feed2Tier1.Close())
assertLaggingRanges("t1", 0)
assertLaggingRanges("t2", numRangesToSkip)

require.NoError(t, feed3Tier2.Close())
assertLaggingRanges("t1", 0)
assertLaggingRanges("t2", 0)
}
// Can't run on tenants due to lack of SPLIT AT support (#54254)
cdcTest(t, testFn, feedTestNoTenants, feedTestEnterpriseSinks)
}

func TestChangefeedBackfillObservability(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
90 changes: 62 additions & 28 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,33 +73,35 @@ const (

// Constants for the options.
const (
OptAvroSchemaPrefix = `avro_schema_prefix`
OptConfluentSchemaRegistry = `confluent_schema_registry`
OptCursor = `cursor`
OptCustomKeyColumn = `key_column`
OptEndTime = `end_time`
OptEnvelope = `envelope`
OptFormat = `format`
OptFullTableName = `full_table_name`
OptKeyInValue = `key_in_value`
OptTopicInValue = `topic_in_value`
OptResolvedTimestamps = `resolved`
OptMinCheckpointFrequency = `min_checkpoint_frequency`
OptUpdatedTimestamps = `updated`
OptMVCCTimestamps = `mvcc_timestamp`
OptDiff = `diff`
OptCompression = `compression`
OptSchemaChangeEvents = `schema_change_events`
OptSchemaChangePolicy = `schema_change_policy`
OptSplitColumnFamilies = `split_column_families`
OptExpirePTSAfter = `gc_protect_expires_after`
OptWebhookAuthHeader = `webhook_auth_header`
OptWebhookClientTimeout = `webhook_client_timeout`
OptOnError = `on_error`
OptMetricsScope = `metrics_label`
OptUnordered = `unordered`
OptVirtualColumns = `virtual_columns`
OptExecutionLocality = `execution_locality`
OptAvroSchemaPrefix = `avro_schema_prefix`
OptConfluentSchemaRegistry = `confluent_schema_registry`
OptCursor = `cursor`
OptCustomKeyColumn = `key_column`
OptEndTime = `end_time`
OptEnvelope = `envelope`
OptFormat = `format`
OptFullTableName = `full_table_name`
OptKeyInValue = `key_in_value`
OptTopicInValue = `topic_in_value`
OptResolvedTimestamps = `resolved`
OptMinCheckpointFrequency = `min_checkpoint_frequency`
OptUpdatedTimestamps = `updated`
OptMVCCTimestamps = `mvcc_timestamp`
OptDiff = `diff`
OptCompression = `compression`
OptSchemaChangeEvents = `schema_change_events`
OptSchemaChangePolicy = `schema_change_policy`
OptSplitColumnFamilies = `split_column_families`
OptExpirePTSAfter = `gc_protect_expires_after`
OptWebhookAuthHeader = `webhook_auth_header`
OptWebhookClientTimeout = `webhook_client_timeout`
OptOnError = `on_error`
OptMetricsScope = `metrics_label`
OptUnordered = `unordered`
OptVirtualColumns = `virtual_columns`
OptExecutionLocality = `execution_locality`
OptLaggingRangesThreshold = `lagging_ranges_threshold`
OptLaggingRangesPollingInterval = `lagging_ranges_polling_interval`

OptVirtualColumnsOmitted VirtualColumnVisibility = `omitted`
OptVirtualColumnsNull VirtualColumnVisibility = `null`
Expand Down Expand Up @@ -348,6 +350,8 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
OptUnordered: flagOption,
OptVirtualColumns: enum("omitted", "null"),
OptExecutionLocality: stringOption,
OptLaggingRangesThreshold: durationOption,
OptLaggingRangesPollingInterval: durationOption,
}

// CommonOptions is options common to all sinks
Expand All @@ -360,7 +364,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
OptOnError,
OptInitialScan, OptNoInitialScan, OptInitialScanOnly, OptUnordered, OptCustomKeyColumn,
OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter,
OptExecutionLocality,
OptExecutionLocality, OptLaggingRangesThreshold, OptLaggingRangesPollingInterval,
)

// SQLValidOptions is options exclusive to SQL sink
Expand Down Expand Up @@ -617,6 +621,8 @@ func (s StatementOptions) getEnumValue(k string) (string, error) {
return rawVal, nil
}

// getDurationValue validates that the option `k` was supplied with a
// valid duration.
func (s StatementOptions) getDurationValue(k string) (*time.Duration, error) {
v, ok := s.m[k]
if !ok {
Expand Down Expand Up @@ -935,6 +941,34 @@ func (s StatementOptions) GetMetricScope() (string, bool) {
return v, ok
}

// GetLaggingRangesConfig returns the threshold and polling rate to use for
// lagging ranges metrics.
func (s StatementOptions) GetLaggingRangesConfig() (
threshold time.Duration,
pollingInterval time.Duration,
e error,
) {
threshold = DefaultLaggingRangesThreshold
pollingInterval = DefaultLaggingRangesPollingInterval
_, ok := s.m[OptLaggingRangesThreshold]
if ok {
t, err := s.getDurationValue(OptLaggingRangesThreshold)
if err != nil {
return threshold, pollingInterval, err
}
threshold = *t
}
_, ok = s.m[OptLaggingRangesPollingInterval]
if ok {
i, err := s.getDurationValue(OptLaggingRangesPollingInterval)
if err != nil {
return threshold, pollingInterval, err
}
pollingInterval = *i
}
return threshold, pollingInterval, nil
}

// IncludeVirtual returns true if we need to set placeholder nulls for virtual columns.
func (s StatementOptions) IncludeVirtual() bool {
return s.m[OptVirtualColumns] == string(OptVirtualColumnsNull)
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,11 @@ var SinkPacerRequestSize = settings.RegisterDurationSetting(
50*time.Millisecond,
settings.PositiveDuration,
)

// DefaultLaggingRangesThreshold is the default duration by which a range must be
// lagging behind the present to be considered as 'lagging' behind in metrics.
var DefaultLaggingRangesThreshold = 3 * time.Minute

// DefaultLaggingRangesPollingInterval is the default polling rate at which
// lagging ranges are checked and metrics are updated.
var DefaultLaggingRangesPollingInterval = 1 * time.Minute

0 comments on commit 4c90f3b

Please sign in to comment.