diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index bc7ef3f8084d..fb157b78b4a9 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -237,6 +237,7 @@ go_test( "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvpb", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/closedts", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/protectedts", "//pkg/roachpb", diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 60a9b05f67da..e80ef34552ab 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -310,7 +310,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { pool = ca.knobs.MemMonitor } limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.flowCtx.Cfg.Settings.SV) - ca.eventProducer, ca.kvFeedDoneCh, ca.errCh, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed, pool, limit) + ca.eventProducer, ca.kvFeedDoneCh, ca.errCh, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed, pool, limit, opts) if err != nil { ca.MoveToDraining(err) ca.cancel() @@ -361,6 +361,7 @@ func (ca *changeAggregator) startKVFeed( config ChangefeedConfig, parentMemMon *mon.BytesMonitor, memLimit int64, + opts changefeedbase.StatementOptions, ) (kvevent.Reader, chan struct{}, chan error, error) { cfg := ca.flowCtx.Cfg kvFeedMemMon := mon.NewMonitorInheritWithLimit("kvFeed", memLimit, parentMemMon) @@ -371,7 +372,7 @@ func (ca *changeAggregator) startKVFeed( // KVFeed takes ownership of the kvevent.Writer portion of the buffer, while // we return the kvevent.Reader part to the caller. - kvfeedCfg, err := ca.makeKVFeedCfg(ctx, config, spans, buf, initialHighWater, needsInitialScan, kvFeedMemMon) + kvfeedCfg, err := ca.makeKVFeedCfg(ctx, config, spans, buf, initialHighWater, needsInitialScan, kvFeedMemMon, opts) if err != nil { return nil, nil, nil, err } @@ -414,6 +415,7 @@ func (ca *changeAggregator) makeKVFeedCfg( initialHighWater hlc.Timestamp, needsInitialScan bool, memMon *mon.BytesMonitor, + opts changefeedbase.StatementOptions, ) (kvfeed.Config, error) { schemaChange, err := config.Opts.GetSchemaChangeHandlingOptions() if err != nil { @@ -432,29 +434,51 @@ func (ca *changeAggregator) makeKVFeedCfg( initialHighWater, &ca.metrics.SchemaFeedMetrics, config.Opts.GetCanHandle()) } + monitoringCfg, err := makeKVFeedMonitoringCfg(ca.sliMetrics, opts) + if err != nil { + return kvfeed.Config{}, err + } + return kvfeed.Config{ - Writer: buf, - Settings: cfg.Settings, - DB: cfg.DB.KV(), - Codec: cfg.Codec, - Clock: cfg.DB.KV().Clock(), - Spans: spans, - CheckpointSpans: ca.spec.Checkpoint.Spans, - CheckpointTimestamp: ca.spec.Checkpoint.Timestamp, - Targets: AllTargets(ca.spec.Feed), - Metrics: &ca.metrics.KVFeedMetrics, - OnBackfillCallback: ca.sliMetrics.getBackfillCallback(), - OnBackfillRangeCallback: ca.sliMetrics.getBackfillRangeCallback(), - MM: memMon, - InitialHighWater: initialHighWater, - EndTime: config.EndTime, - WithDiff: filters.WithDiff, - NeedsInitialScan: needsInitialScan, - SchemaChangeEvents: schemaChange.EventClass, - SchemaChangePolicy: schemaChange.Policy, - SchemaFeed: sf, - Knobs: ca.knobs.FeedKnobs, - UseMux: changefeedbase.UseMuxRangeFeed.Get(&cfg.Settings.SV), + Writer: buf, + Settings: cfg.Settings, + DB: cfg.DB.KV(), + Codec: cfg.Codec, + Clock: cfg.DB.KV().Clock(), + Spans: spans, + CheckpointSpans: ca.spec.Checkpoint.Spans, + CheckpointTimestamp: ca.spec.Checkpoint.Timestamp, + Targets: AllTargets(ca.spec.Feed), + Metrics: &ca.metrics.KVFeedMetrics, + MM: memMon, + InitialHighWater: initialHighWater, + EndTime: config.EndTime, + WithDiff: filters.WithDiff, + NeedsInitialScan: needsInitialScan, + SchemaChangeEvents: schemaChange.EventClass, + SchemaChangePolicy: schemaChange.Policy, + SchemaFeed: sf, + Knobs: ca.knobs.FeedKnobs, + UseMux: changefeedbase.UseMuxRangeFeed.Get(&cfg.Settings.SV), + MonitoringCfg: monitoringCfg, + }, nil +} + +func makeKVFeedMonitoringCfg( + sliMetrics *sliMetrics, opts changefeedbase.StatementOptions, +) (kvfeed.MonitoringConfig, error) { + laggingRangesThreshold, laggingRangesInterval, err := opts.GetLaggingRangesConfig() + if err != nil { + return kvfeed.MonitoringConfig{}, err + } + + return kvfeed.MonitoringConfig{ + LaggingRangesCallback: sliMetrics.getLaggingRangesCallback(), + LaggingRangesThreshold: laggingRangesThreshold, + LaggingRangesPollingInterval: laggingRangesInterval, + + OnBackfillCallback: sliMetrics.getBackfillCallback(), + OnBackfillRangeCallback: sliMetrics.getBackfillRangeCallback(), }, nil } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 270d6a4bd2c7..47c303b4396c 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -50,6 +50,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" @@ -1226,6 +1228,122 @@ func TestChangefeedInitialScan(t *testing.T) { cdcTest(t, testFn) } +// TestChangefeedLaggingRangesMetrics tests the behavior of the +// changefeed.lagging_ranges metric. +func TestChangefeedLaggingRangesMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + // Ensure a fast closed timestamp interval so ranges can catch up fast. + kvserver.RangeFeedRefreshInterval.Override( + context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond) + closedts.SideTransportCloseInterval.Override( + context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond) + closedts.TargetDuration.Override( + context.Background(), &s.Server.ClusterSettings().SV, 20*time.Millisecond) + + skipMu := syncutil.Mutex{} + skippedRanges := map[string]struct{}{} + numRanges := 10 + numRangesToSkip := int64(4) + var stopSkip atomic.Bool + // `shouldSkip` continuously skips checkpoints for the first `numRangesToSkip` ranges it sees. + // skipping is disabled by setting `stopSkip` to true. + shouldSkip := func(event *kvpb.RangeFeedEvent) bool { + if stopSkip.Load() { + return false + } + switch event.GetValue().(type) { + case *kvpb.RangeFeedCheckpoint: + sp := event.Checkpoint.Span + skipMu.Lock() + defer skipMu.Unlock() + if _, ok := skippedRanges[sp.String()]; ok || int64(len(skippedRanges)) < numRangesToSkip { + skippedRanges[sp.String()] = struct{}{} + return true + } + } + return false + } + + knobs := s.TestingKnobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs) + + knobs.FeedKnobs.RangefeedOptions = append(knobs.FeedKnobs.RangefeedOptions, kvcoord.TestingWithOnRangefeedEvent( + func(ctx context.Context, s roachpb.Span, _ int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) { + return shouldSkip(event), nil + }), + ) + + registry := s.Server.JobRegistry().(*jobs.Registry) + sli1, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("t1") + require.NoError(t, err) + laggingRangesTier1 := sli1.LaggingRanges + sli2, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("t2") + require.NoError(t, err) + laggingRangesTier2 := sli2.LaggingRanges + + assertLaggingRanges := func(tier string, expected int64) { + testutils.SucceedsWithin(t, func() error { + var laggingRangesObserved int64 + if tier == "t1" { + laggingRangesObserved = laggingRangesTier1.Value() + } else { + laggingRangesObserved = laggingRangesTier2.Value() + } + if laggingRangesObserved != expected { + return fmt.Errorf("expected %d lagging ranges, but found %d", expected, laggingRangesObserved) + } + return nil + }, 10*time.Second) + } + + sqlDB.Exec(t, fmt.Sprintf(` + CREATE TABLE foo (key INT PRIMARY KEY); + INSERT INTO foo (key) SELECT * FROM generate_series(1, %d); + ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, %d, 1)); + `, numRanges, numRanges-1)) + sqlDB.CheckQueryResults(t, `SELECT count(*) FROM [SHOW RANGES FROM TABLE foo]`, + [][]string{{fmt.Sprint(numRanges)}}, + ) + + const laggingRangesOpts = `lagging_ranges_threshold="250ms", lagging_ranges_polling_interval="25ms"` + feed1Tier1 := feed(t, f, + fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t1", %s`, laggingRangesOpts)) + feed2Tier1 := feed(t, f, + fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t1", %s`, laggingRangesOpts)) + feed3Tier2 := feed(t, f, + fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH initial_scan='no', metrics_label="t2", %s`, laggingRangesOpts)) + + assertLaggingRanges("t1", numRangesToSkip*2) + assertLaggingRanges("t2", numRangesToSkip) + + stopSkip.Store(true) + assertLaggingRanges("t1", 0) + assertLaggingRanges("t2", 0) + + stopSkip.Store(false) + assertLaggingRanges("t1", numRangesToSkip*2) + assertLaggingRanges("t2", numRangesToSkip) + + require.NoError(t, feed1Tier1.Close()) + assertLaggingRanges("t1", numRangesToSkip) + assertLaggingRanges("t2", numRangesToSkip) + + require.NoError(t, feed2Tier1.Close()) + assertLaggingRanges("t1", 0) + assertLaggingRanges("t2", numRangesToSkip) + + require.NoError(t, feed3Tier2.Close()) + assertLaggingRanges("t1", 0) + assertLaggingRanges("t2", 0) + } + // Can't run on tenants due to lack of SPLIT AT support (#54254) + cdcTest(t, testFn, feedTestNoTenants, feedTestEnterpriseSinks) +} + func TestChangefeedBackfillObservability(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 10aac537bf36..1252ffffb56b 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -73,33 +73,35 @@ const ( // Constants for the options. const ( - OptAvroSchemaPrefix = `avro_schema_prefix` - OptConfluentSchemaRegistry = `confluent_schema_registry` - OptCursor = `cursor` - OptCustomKeyColumn = `key_column` - OptEndTime = `end_time` - OptEnvelope = `envelope` - OptFormat = `format` - OptFullTableName = `full_table_name` - OptKeyInValue = `key_in_value` - OptTopicInValue = `topic_in_value` - OptResolvedTimestamps = `resolved` - OptMinCheckpointFrequency = `min_checkpoint_frequency` - OptUpdatedTimestamps = `updated` - OptMVCCTimestamps = `mvcc_timestamp` - OptDiff = `diff` - OptCompression = `compression` - OptSchemaChangeEvents = `schema_change_events` - OptSchemaChangePolicy = `schema_change_policy` - OptSplitColumnFamilies = `split_column_families` - OptExpirePTSAfter = `gc_protect_expires_after` - OptWebhookAuthHeader = `webhook_auth_header` - OptWebhookClientTimeout = `webhook_client_timeout` - OptOnError = `on_error` - OptMetricsScope = `metrics_label` - OptUnordered = `unordered` - OptVirtualColumns = `virtual_columns` - OptExecutionLocality = `execution_locality` + OptAvroSchemaPrefix = `avro_schema_prefix` + OptConfluentSchemaRegistry = `confluent_schema_registry` + OptCursor = `cursor` + OptCustomKeyColumn = `key_column` + OptEndTime = `end_time` + OptEnvelope = `envelope` + OptFormat = `format` + OptFullTableName = `full_table_name` + OptKeyInValue = `key_in_value` + OptTopicInValue = `topic_in_value` + OptResolvedTimestamps = `resolved` + OptMinCheckpointFrequency = `min_checkpoint_frequency` + OptUpdatedTimestamps = `updated` + OptMVCCTimestamps = `mvcc_timestamp` + OptDiff = `diff` + OptCompression = `compression` + OptSchemaChangeEvents = `schema_change_events` + OptSchemaChangePolicy = `schema_change_policy` + OptSplitColumnFamilies = `split_column_families` + OptExpirePTSAfter = `gc_protect_expires_after` + OptWebhookAuthHeader = `webhook_auth_header` + OptWebhookClientTimeout = `webhook_client_timeout` + OptOnError = `on_error` + OptMetricsScope = `metrics_label` + OptUnordered = `unordered` + OptVirtualColumns = `virtual_columns` + OptExecutionLocality = `execution_locality` + OptLaggingRangesThreshold = `lagging_ranges_threshold` + OptLaggingRangesPollingInterval = `lagging_ranges_polling_interval` OptVirtualColumnsOmitted VirtualColumnVisibility = `omitted` OptVirtualColumnsNull VirtualColumnVisibility = `null` @@ -348,6 +350,8 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{ OptUnordered: flagOption, OptVirtualColumns: enum("omitted", "null"), OptExecutionLocality: stringOption, + OptLaggingRangesThreshold: durationOption, + OptLaggingRangesPollingInterval: durationOption, } // CommonOptions is options common to all sinks @@ -360,7 +364,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope, OptOnError, OptInitialScan, OptNoInitialScan, OptInitialScanOnly, OptUnordered, OptCustomKeyColumn, OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter, - OptExecutionLocality, + OptExecutionLocality, OptLaggingRangesThreshold, OptLaggingRangesPollingInterval, ) // SQLValidOptions is options exclusive to SQL sink @@ -617,6 +621,8 @@ func (s StatementOptions) getEnumValue(k string) (string, error) { return rawVal, nil } +// getDurationValue validates that the option `k` was supplied with a +// valid duration. func (s StatementOptions) getDurationValue(k string) (*time.Duration, error) { v, ok := s.m[k] if !ok { @@ -935,6 +941,34 @@ func (s StatementOptions) GetMetricScope() (string, bool) { return v, ok } +// GetLaggingRangesConfig returns the threshold and polling rate to use for +// lagging ranges metrics. +func (s StatementOptions) GetLaggingRangesConfig() ( + threshold time.Duration, + pollingInterval time.Duration, + e error, +) { + threshold = DefaultLaggingRangesThreshold + pollingInterval = DefaultLaggingRangesPollingInterval + _, ok := s.m[OptLaggingRangesThreshold] + if ok { + t, err := s.getDurationValue(OptLaggingRangesThreshold) + if err != nil { + return threshold, pollingInterval, err + } + threshold = *t + } + _, ok = s.m[OptLaggingRangesPollingInterval] + if ok { + i, err := s.getDurationValue(OptLaggingRangesPollingInterval) + if err != nil { + return threshold, pollingInterval, err + } + pollingInterval = *i + } + return threshold, pollingInterval, nil +} + // IncludeVirtual returns true if we need to set placeholder nulls for virtual columns. func (s StatementOptions) IncludeVirtual() bool { return s.m[OptVirtualColumns] == string(OptVirtualColumnsNull) diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index f5a0f9017ada..63aa89b04003 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -308,3 +308,11 @@ var SinkPacerRequestSize = settings.RegisterDurationSetting( 50*time.Millisecond, settings.PositiveDuration, ) + +// DefaultLaggingRangesThreshold is the default duration by which a range must be +// lagging behind the present to be considered as 'lagging' behind in metrics. +var DefaultLaggingRangesThreshold = 3 * time.Minute + +// DefaultLaggingRangesPollingInterval is the default polling rate at which +// lagging ranges are checked and metrics are updated. +var DefaultLaggingRangesPollingInterval = 1 * time.Minute diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 5147e9d8c92b..4e8d12ea4f95 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -15,6 +15,7 @@ package kvfeed import ( "context" "fmt" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" @@ -30,28 +31,45 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/span" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) -// Config configures a kvfeed. -type Config struct { - Settings *cluster.Settings - DB *kv.DB - Codec keys.SQLCodec - Clock *hlc.Clock - Spans []roachpb.Span - CheckpointSpans []roachpb.Span - CheckpointTimestamp hlc.Timestamp - Targets changefeedbase.Targets - Writer kvevent.Writer - Metrics *kvevent.Metrics +// MonitoringConfig is a set of callbacks which the kvfeed calls to provide +// the caller with information about the state of the kvfeed. +type MonitoringConfig struct { + // LaggingRangesCallback is called periodically with the number of lagging ranges + // in the kvfeed. + LaggingRangesCallback func(int64) + // LaggingRangesPollingInterval is how often the kv feed will poll for + // lagging ranges. + LaggingRangesPollingInterval time.Duration + // LaggingRangesThreshold is how far behind a range must be to be considered + // lagging. + LaggingRangesThreshold time.Duration + OnBackfillCallback func() func() OnBackfillRangeCallback func(int64) (func(), func()) - MM *mon.BytesMonitor - WithDiff bool - SchemaChangeEvents changefeedbase.SchemaChangeEventClass - SchemaChangePolicy changefeedbase.SchemaChangePolicy - SchemaFeed schemafeed.SchemaFeed +} + +// Config configures a kvfeed. +type Config struct { + Settings *cluster.Settings + DB *kv.DB + Codec keys.SQLCodec + Clock *hlc.Clock + Spans []roachpb.Span + CheckpointSpans []roachpb.Span + CheckpointTimestamp hlc.Timestamp + Targets changefeedbase.Targets + Writer kvevent.Writer + Metrics *kvevent.Metrics + MonitoringCfg MonitoringConfig + MM *mon.BytesMonitor + WithDiff bool + SchemaChangeEvents changefeedbase.SchemaChangeEventClass + SchemaChangePolicy changefeedbase.SchemaChangePolicy + SchemaFeed schemafeed.SchemaFeed // If true, the feed will begin with a dump of data at exactly the // InitialHighWater. This is a peculiar behavior. In general the @@ -84,7 +102,7 @@ func Run(ctx context.Context, cfg Config) error { sc = &scanRequestScanner{ settings: cfg.Settings, db: cfg.DB, - onBackfillRangeCallback: cfg.OnBackfillRangeCallback, + onBackfillRangeCallback: cfg.MonitoringCfg.OnBackfillRangeCallback, } } var pff physicalFeedFactory @@ -98,6 +116,7 @@ func Run(ctx context.Context, cfg Config) error { return kvevent.NewMemBuffer(cfg.MM.MakeBoundAccount(), &cfg.Settings.SV, cfg.Metrics) } + g := ctxgroup.WithContext(ctx) f := newKVFeed( cfg.Writer, cfg.Spans, cfg.CheckpointSpans, cfg.CheckpointTimestamp, cfg.SchemaChangeEvents, cfg.SchemaChangePolicy, @@ -106,9 +125,10 @@ func Run(ctx context.Context, cfg Config) error { cfg.Codec, cfg.SchemaFeed, sc, pff, bf, cfg.UseMux, cfg.Targets, cfg.Knobs) - f.onBackfillCallback = cfg.OnBackfillCallback + f.onBackfillCallback = cfg.MonitoringCfg.OnBackfillCallback + f.rangeObserver = startLaggingRangesObserver(g, cfg.MonitoringCfg.LaggingRangesCallback, + cfg.MonitoringCfg.LaggingRangesPollingInterval, cfg.MonitoringCfg.LaggingRangesThreshold) - g := ctxgroup.WithContext(ctx) g.GoCtx(cfg.SchemaFeed.Run) g.GoCtx(f.run) err := g.Wait() @@ -152,6 +172,59 @@ func Run(ctx context.Context, cfg Config) error { return err } +func startLaggingRangesObserver( + g ctxgroup.Group, + updateLaggingRanges func(int64), + pollingInterval time.Duration, + threshold time.Duration, +) func(fn kvcoord.ForEachRangeFn) { + return func(fn kvcoord.ForEachRangeFn) { + g.GoCtx(func(ctx context.Context) error { + // Reset metrics on shutdown. + defer func() { + updateLaggingRanges(0) + }() + + var timer timeutil.Timer + defer timer.Stop() + timer.Reset(pollingInterval) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + timer.Read = true + + count := int64(0) + thresholdTS := timeutil.Now().Add(-1 * threshold) + err := fn(func(rfCtx kvcoord.RangeFeedContext, feed kvcoord.PartialRangeFeed) error { + // The resolved timestamp of a range determines the timestamp which is caught up to. + // However, during catchup scans, this is not set. For catchup scans, we consider the + // time the partial rangefeed was created to be its resolved ts. Note that a range can + // restart due to a range split, transient error etc. In these cases you also expect + // to see a `CreatedTime` but no resolved timestamp. + ts := feed.Resolved + if ts.IsEmpty() { + ts = hlc.Timestamp{WallTime: feed.CreatedTime.UnixNano()} + } + + if ts.Less(hlc.Timestamp{WallTime: thresholdTS.UnixNano()}) { + count += 1 + } + return nil + }) + if err != nil { + return err + } + updateLaggingRanges(count) + timer.Reset(pollingInterval) + } + } + }) + } +} + // schemaChangeDetectedError is a sentinel error to indicate to Run() that the // schema change is stopping due to a schema change. This is handy to trigger // the context group to stop; the error is handled entirely in this package. @@ -175,6 +248,7 @@ type kvFeed struct { codec keys.SQLCodec onBackfillCallback func() func() + rangeObserver func(fn kvcoord.ForEachRangeFn) schemaChangeEvents changefeedbase.SchemaChangeEventClass schemaChangePolicy changefeedbase.SchemaChangePolicy @@ -485,11 +559,12 @@ func (f *kvFeed) runUntilTableEvent( g := ctxgroup.WithContext(ctx) physicalCfg := rangeFeedConfig{ - Spans: stps, - Frontier: resumeFrontier.Frontier(), - WithDiff: f.withDiff, - Knobs: f.knobs, - UseMux: f.useMux, + Spans: stps, + Frontier: resumeFrontier.Frontier(), + WithDiff: f.withDiff, + Knobs: f.knobs, + UseMux: f.useMux, + RangeObserver: f.rangeObserver, } // The following two synchronous calls works as follows: diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index d8332e189828..5ae2c6ca1c3e 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -27,11 +27,12 @@ type physicalFeedFactory interface { } type rangeFeedConfig struct { - Frontier hlc.Timestamp - Spans []kvcoord.SpanTimePair - WithDiff bool - Knobs TestingKnobs - UseMux bool + Frontier hlc.Timestamp + Spans []kvcoord.SpanTimePair + WithDiff bool + RangeObserver func(fn kvcoord.ForEachRangeFn) + Knobs TestingKnobs + UseMux bool } type rangefeedFactory func( @@ -81,6 +82,12 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang if cfg.WithDiff { rfOpts = append(rfOpts, kvcoord.WithDiff()) } + if cfg.RangeObserver != nil { + rfOpts = append(rfOpts, kvcoord.WithRangeObserver(cfg.RangeObserver)) + } + if len(cfg.Knobs.RangefeedOptions) != 0 { + rfOpts = append(rfOpts, cfg.Knobs.RangefeedOptions...) + } g.GoCtx(func(ctx context.Context) error { return p(ctx, cfg.Spans, feed.eventC, rfOpts...) diff --git a/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go b/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go index 1982d6cdb9d0..541c3a34c23b 100644 --- a/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go +++ b/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go @@ -33,6 +33,8 @@ type TestingKnobs struct { // ModifyTimestamps is called on the timestamp for each RangefeedMessage // before converting it into a kv event. ModifyTimestamps func(*hlc.Timestamp) + // RangefeedOptions lets the kvfeed override rangefeed settings. + RangefeedOptions []kvcoord.RangeFeedOption } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index cde48c442fc9..276a8bce0cb2 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -70,6 +70,7 @@ type AggMetrics struct { SchemaRegistryRetries *aggmetric.AggCounter AggregatorProgress *aggmetric.AggGauge CheckpointProgress *aggmetric.AggGauge + LaggingRanges *aggmetric.AggGauge // There is always at least 1 sliMetrics created for defaultSLI scope. mu struct { @@ -132,6 +133,7 @@ type sliMetrics struct { SchemaRegistryRetries *aggmetric.Counter AggregatorProgress *aggmetric.Gauge CheckpointProgress *aggmetric.Gauge + LaggingRanges *aggmetric.Gauge mu struct { syncutil.Mutex @@ -607,6 +609,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { Measurement: "Unix Timestamp Nanoseconds", Unit: metric.Unit_TIMESTAMP_NS, } + metaLaggingRangePercentage := metric.Metadata{ + Name: "changefeed.lagging_ranges", + Help: "The number of ranges considered to be lagging behind", + Measurement: "Ranges", + Unit: metric.Unit_COUNT, + } functionalGaugeMinFn := func(childValues []int64) int64 { var min int64 @@ -617,6 +625,7 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { } return min } + // NB: When adding new histograms, use sigFigs = 1. Older histograms // retain significant figures of 2. b := aggmetric.MakeBuilder("scope") @@ -681,6 +690,7 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { SchemaRegistrations: b.Counter(metaSchemaRegistryRegistrations), AggregatorProgress: b.FunctionalGauge(metaAggregatorProgress, functionalGaugeMinFn), CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn), + LaggingRanges: b.Gauge(metaLaggingRangePercentage), } a.mu.sliMetrics = make(map[string]*sliMetrics) _, err := a.getOrCreateScope(defaultSLIScope) @@ -740,6 +750,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { InternalRetryMessageCount: a.InternalRetryMessageCount.AddChild(scope), SchemaRegistryRetries: a.SchemaRegistryRetries.AddChild(scope), SchemaRegistrations: a.SchemaRegistrations.AddChild(scope), + LaggingRanges: a.LaggingRanges.AddChild(scope), } sm.mu.resolved = make(map[int64]hlc.Timestamp) sm.mu.checkpoint = make(map[int64]hlc.Timestamp) @@ -769,6 +780,31 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { return sm, nil } +// getLaggingRangesCallback returns a function which can be called to update the +// lagging ranges metric. It should be called with the current number of lagging +// ranges. +func (s *sliMetrics) getLaggingRangesCallback() func(int64) { + // Because this gauge is shared between changefeeds in the same metrics scope, + // we must instead modify it using `Inc` and `Dec` (as opposed to `Update`) to + // ensure values written by others are not overwritten. The code below is used + // to determine the deltas based on the last known number of lagging ranges. + // + // Example: + // + // Initially there are 0 lagging ranges, so `last` is 0. Assume the gauge + // has an arbitrary value X. + // + // If 10 ranges are behind, last=0,i=10: X.Dec(0 - 10) = X.Inc(10) + // If 3 ranges catch up, last=10,i=7: X.Dec(10 - 7) = X.Dec(3) + // If 4 ranges fall behind, last=7,i=11: X.Dec(7 - 11) = X.Inc(4) + // If 1 lagging range is deleted, last=7,i=10: X.Dec(11-10) = X.Dec(1) + var last int64 + return func(i int64) { + s.LaggingRanges.Dec(last - i) + last = i + } +} + // Metrics are for production monitoring of changefeeds. type Metrics struct { AggMetrics *AggMetrics diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 0dc209f7bb02..98d3d0eeb08d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -89,10 +89,14 @@ func maxConcurrentCatchupScans(sv *settings.Values) int { return int(l) } +// ForEachRangeFn is used to execute `fn` over each range in a rangefeed. +type ForEachRangeFn func(fn ActiveRangeFeedIterFn) error + type rangeFeedConfig struct { useMuxRangeFeed bool overSystemTable bool withDiff bool + rangeObserver func(ForEachRangeFn) knobs struct { // onRangefeedEvent invoked on each rangefeed event. @@ -139,6 +143,14 @@ func WithDiff() RangeFeedOption { }) } +// WithRangeObserver is called when the rangefeed starts with a function that +// can be used to iterate over all the ranges. +func WithRangeObserver(observer func(ForEachRangeFn)) RangeFeedOption { + return optionFunc(func(c *rangeFeedConfig) { + c.rangeObserver = observer + }) +} + // A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation. var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true) @@ -211,6 +223,9 @@ func (ds *DistSender) RangeFeedSpans( rr := newRangeFeedRegistry(ctx, cfg.withDiff) ds.activeRangeFeeds.Store(rr, nil) defer ds.activeRangeFeeds.Delete(rr) + if cfg.rangeObserver != nil { + cfg.rangeObserver(rr.ForEachPartialRangefeed) + } catchupSem := limit.MakeConcurrentRequestLimiter( "distSenderCatchupLimit", maxConcurrentCatchupScans(&ds.st.SV)) @@ -303,34 +318,42 @@ type PartialRangeFeed struct { // ActiveRangeFeedIterFn is an iterator function which is passed PartialRangeFeed structure. // Iterator function may return an iterutil.StopIteration sentinel error to stop iteration -// early; any other error is propagated. +// early. type ActiveRangeFeedIterFn func(rfCtx RangeFeedContext, feed PartialRangeFeed) error -// ForEachActiveRangeFeed invokes provided function for each active range feed. +const continueIter = true +const stopIter = false + +// ForEachActiveRangeFeed invokes provided function for each active rangefeed. +// iterutil.StopIteration can be returned by `fn` to stop iteration, and doing +// so will not return this error. func (ds *DistSender) ForEachActiveRangeFeed(fn ActiveRangeFeedIterFn) (iterErr error) { - const continueIter = true - const stopIter = false + ds.activeRangeFeeds.Range(func(k, v interface{}) bool { + r := k.(*rangeFeedRegistry) + iterErr = r.ForEachPartialRangefeed(fn) + return iterErr == nil + }) + return iterutil.Map(iterErr) +} + +// ForEachPartialRangefeed invokes provided function for each partial rangefeed. Use manageIterationErrs +// if the fn uses iterutil.StopIteration to stop iteration. +func (r *rangeFeedRegistry) ForEachPartialRangefeed(fn ActiveRangeFeedIterFn) (iterErr error) { partialRangeFeed := func(active *activeRangeFeed) PartialRangeFeed { active.Lock() defer active.Unlock() return active.PartialRangeFeed } - - ds.activeRangeFeeds.Range(func(k, v interface{}) bool { - r := k.(*rangeFeedRegistry) - r.ranges.Range(func(k, v interface{}) bool { - active := k.(*activeRangeFeed) - if err := fn(r.RangeFeedContext, partialRangeFeed(active)); err != nil { - iterErr = err - return stopIter - } - return continueIter - }) - return iterErr == nil + r.ranges.Range(func(k, v interface{}) bool { + active := k.(*activeRangeFeed) + if err := fn(r.RangeFeedContext, partialRangeFeed(active)); err != nil { + iterErr = err + return stopIter + } + return continueIter }) - - return iterutil.Map(iterErr) + return iterErr } // activeRangeFeed is a thread safe PartialRangeFeed. @@ -445,10 +468,10 @@ func newActiveRangeFeed( StartAfter: startAfter, CreatedTime: timeutil.Now(), }, - release: func() { - rr.ranges.Delete(active) - c.Dec(1) - }, + } + active.release = func() { + rr.ranges.Delete(active) + c.Dec(1) } rr.ranges.Store(active, nil) c.Inc(1) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index 2452db21890b..e8014eb13070 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -12,7 +12,9 @@ package kvcoord_test import ( "context" + "fmt" "math/rand" + "reflect" "sync" "sync/atomic" "testing" @@ -805,6 +807,117 @@ func TestRangeFeedMetricsManagement(t *testing.T) { }) } +// TestRangefeedRangeObserver ensures the kvcoord.WithRangeObserver option +// works correctly. +func TestRangefeedRangeObserver(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + ts := tc.Server(0) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + kvserver.RangefeedEnabled.Override( + context.Background(), &ts.ClusterSettings().SV, true) + + testutils.RunTrueAndFalse(t, "mux", func(t *testing.T, useMux bool) { + sqlDB.ExecMultiple(t, + `CREATE TABLE foo (key INT PRIMARY KEY)`, + `INSERT INTO foo (key) SELECT * FROM generate_series(1, 4)`, + `ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, 4, 1))`, + ) + defer func() { + sqlDB.Exec(t, `DROP TABLE foo`) + }() + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + + ignoreValues := func(event kvcoord.RangeFeedMessage) {} + + // Set up an observer to continuously poll for the list of ranges + // being watched. + var observedRangesMu syncutil.Mutex + observedRanges := make(map[string]struct{}) + ctx2, cancel := context.WithCancel(context.Background()) + g := ctxgroup.WithContext(ctx2) + defer func() { + cancel() + err := g.Wait() + // Ensure the observer goroutine terminates gracefully via context cancellation. + require.True(t, testutils.IsError(err, "context canceled")) + }() + observer := func(fn kvcoord.ForEachRangeFn) { + g.GoCtx(func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(200 * time.Millisecond): + } + observedRangesMu.Lock() + observedRanges = make(map[string]struct{}) + err := fn(func(rfCtx kvcoord.RangeFeedContext, feed kvcoord.PartialRangeFeed) error { + observedRanges[feed.Span.String()] = struct{}{} + return nil + }) + observedRangesMu.Unlock() + if err != nil { + return err + } + } + }) + } + + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, ts.Clock().Now(), ignoreValues, useMux, + kvcoord.WithRangeObserver(observer)) + defer closeFeed() + + makeSpan := func(suffix string) string { + return fmt.Sprintf("/Table/%d/%s", fooDesc.GetID(), suffix) + } + + // The initial set of ranges we expect to observe. + expectedRanges := map[string]struct{}{ + makeSpan("1{-/1}"): {}, + makeSpan("1/{1-2}"): {}, + makeSpan("1/{2-3}"): {}, + makeSpan("1/{3-4}"): {}, + makeSpan("{1/4-2}"): {}, + } + checkExpectedRanges := func() { + testutils.SucceedsWithin(t, func() error { + observedRangesMu.Lock() + defer observedRangesMu.Unlock() + if !reflect.DeepEqual(observedRanges, expectedRanges) { + return errors.Newf("expected ranges %v, but got %v", expectedRanges, observedRanges) + } + return nil + }, 10*time.Second) + } + checkExpectedRanges() + + // Add another range and ensure we can observe it. + sqlDB.ExecMultiple(t, + `INSERT INTO FOO VALUES(5)`, + `ALTER TABLE foo SPLIT AT VALUES(5)`, + ) + expectedRanges = map[string]struct{}{ + makeSpan("1{-/1}"): {}, + makeSpan("1/{1-2}"): {}, + makeSpan("1/{2-3}"): {}, + makeSpan("1/{3-4}"): {}, + makeSpan("1/{4-5}"): {}, + makeSpan("{1/5-2}"): {}, + } + checkExpectedRanges() + }) +} + // TestMuxRangeFeedCanCloseStream verifies stream termination functionality in mux rangefeed. func TestMuxRangeFeedCanCloseStream(t *testing.T) { defer leaktest.AfterTest(t)()