diff --git a/pkg/ccl/changefeedccl/buffer.go b/pkg/ccl/changefeedccl/buffer.go index ca9c5bc535f4..347aaada4621 100644 --- a/pkg/ccl/changefeedccl/buffer.go +++ b/pkg/ccl/changefeedccl/buffer.go @@ -10,10 +10,12 @@ package changefeedccl import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) type bufferEntry struct { @@ -22,6 +24,8 @@ type bufferEntry struct { // Timestamp of the schema that should be used to read this KV. // If unset (zero-valued), the value's timestamp will be used instead. schemaTimestamp hlc.Timestamp + // bufferGetTimestamp is the time this entry came out of the buffer. + bufferGetTimestamp time.Time } // buffer mediates between the changed data poller and the rest of the @@ -67,6 +71,7 @@ func (b *buffer) Get(ctx context.Context) (bufferEntry, error) { case <-ctx.Done(): return bufferEntry{}, ctx.Err() case e := <-b.entriesCh: + e.bufferGetTimestamp = timeutil.Now() return e, nil } } diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index 2d3a88d5ab92..3a4c0813ea0c 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -61,6 +61,9 @@ type emitEntry struct { // span that no previously unseen entries with a lower or equal updated // timestamp will be emitted. resolved *jobspb.ResolvedSpan + + // bufferGetTimestamp is the time this entry came out of the buffer. + bufferGetTimestamp time.Time } // kvsToRows gets changed kvs from a closure and converts them into sql rows. It @@ -76,6 +79,7 @@ func kvsToRows( var kvs sqlbase.SpanKVFetcher appendEmitEntryForKV := func( ctx context.Context, output []emitEntry, kv roachpb.KeyValue, schemaTimestamp hlc.Timestamp, + bufferGetTimestamp time.Time, ) ([]emitEntry, error) { // Reuse kvs to save allocations. kvs.KVs = kvs.KVs[:0] @@ -104,6 +108,7 @@ func kvsToRows( for { var r emitEntry + r.bufferGetTimestamp = bufferGetTimestamp r.row.datums, r.row.tableDesc, _, err = rf.NextRow(ctx) if err != nil { return nil, err @@ -140,13 +145,17 @@ func kvsToRows( if input.schemaTimestamp != (hlc.Timestamp{}) { schemaTimestamp = input.schemaTimestamp } - output, err = appendEmitEntryForKV(ctx, output, input.kv, schemaTimestamp) + output, err = appendEmitEntryForKV( + ctx, output, input.kv, schemaTimestamp, input.bufferGetTimestamp) if err != nil { return nil, err } } if input.resolved != nil { - output = append(output, emitEntry{resolved: input.resolved}) + output = append(output, emitEntry{ + resolved: input.resolved, + bufferGetTimestamp: input.bufferGetTimestamp, + }) } if output != nil { return output, nil @@ -166,6 +175,7 @@ func emitEntries( sink Sink, inputFn func(context.Context) ([]emitEntry, error), knobs TestingKnobs, + metrics *Metrics, ) func(context.Context) ([]jobspb.ResolvedSpan, error) { var scratch bufalloc.ByteAllocator emitRowFn := func(ctx context.Context, row emitRow) error { @@ -210,6 +220,17 @@ func emitEntries( return nil, err } for _, input := range inputs { + if input.bufferGetTimestamp == (time.Time{}) { + // We could gracefully handle this instead of panic'ing, but + // we'd really like to be able to reason about this data, so + // instead we're defensive. If this is ever seen in prod without + // breaking a unit test, then we have a pretty severe test + // coverage issue. + panic(`unreachable: bufferGetTimestamp is set by all codepaths`) + } + processingNanos := timeutil.Since(input.bufferGetTimestamp).Nanoseconds() + metrics.ProcessingNanos.Inc(processingNanos) + if input.row.datums != nil { if err := emitRowFn(ctx, input.row); err != nil { return nil, err diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 73555cb803b5..ddbede113f0d 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/pkg/errors" ) @@ -150,7 +151,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context { leaseMgr := ca.flowCtx.LeaseManager.(*sql.LeaseManager) ca.poller = makePoller( ca.flowCtx.Settings, ca.flowCtx.ClientDB, ca.flowCtx.ClientDB.Clock(), ca.flowCtx.Gossip, - spans, ca.spec.Feed, initialHighWater, buf, leaseMgr, + spans, ca.spec.Feed, initialHighWater, buf, leaseMgr, metrics, ) rowsFn := kvsToRows(leaseMgr, ca.spec.Feed, buf.Get) @@ -158,7 +159,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context { if cfKnobs, ok := ca.flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok { knobs = *cfKnobs } - ca.tickFn = emitEntries(ca.flowCtx.Settings, ca.spec.Feed, ca.encoder, ca.sink, rowsFn, knobs) + ca.tickFn = emitEntries(ca.flowCtx.Settings, ca.spec.Feed, ca.encoder, ca.sink, rowsFn, knobs, metrics) // Give errCh enough buffer both possible errors from supporting goroutines, // but only the first one is ever used. @@ -300,6 +301,8 @@ type changeFrontier struct { freqEmitResolved time.Duration // lastEmitResolved is the last time a resolved timestamp was emitted. lastEmitResolved time.Time + // lastSlowSpanLog is the last time a slow span from `sf` was logged. + lastSlowSpanLog time.Time // jobProgressedFn, if non-nil, is called to checkpoint the changefeed's // progress in the corresponding system job entry. @@ -497,7 +500,9 @@ func (cf *changeFrontier) noteResolvedSpan(d sqlbase.EncDatum) error { if err := protoutil.Unmarshal([]byte(*raw), &resolved); err != nil { return errors.Wrapf(err, `unmarshalling resolved span: %x`, raw) } - if cf.sf.Forward(resolved.Span, resolved.Timestamp) { + + frontierChanged := cf.sf.Forward(resolved.Span, resolved.Timestamp) + if frontierChanged { newResolved := cf.sf.Frontier() cf.metrics.mu.Lock() if cf.metricsID != -1 { @@ -517,6 +522,25 @@ func (cf *changeFrontier) noteResolvedSpan(d sqlbase.EncDatum) error { cf.lastEmitResolved = newResolved.GoTime() } } + + // Potentially log the most behind span in the frontier for debugging. + slownessThreshold := 10 * changefeedPollInterval.Get(&cf.flowCtx.Settings.SV) + frontier := cf.sf.Frontier() + now := timeutil.Now() + if resolvedBehind := now.Sub(frontier.GoTime()); resolvedBehind > slownessThreshold { + if frontierChanged { + log.Infof(cf.Ctx, "job %d new resolved timestamp %s is behind by %s", + cf.spec.JobID, frontier, resolvedBehind) + } + const slowSpanMaxFrequency = 10 * time.Second + if now.Sub(cf.lastSlowSpanLog) > slowSpanMaxFrequency { + cf.lastSlowSpanLog = now + s := cf.sf.peekFrontierSpan() + log.Infof(cf.Ctx, "job %d span [%s,%s) is behind by %s", + cf.spec.JobID, s.Key, s.EndKey, resolvedBehind) + } + } + return nil } diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 47112e4057df..7675605886f3 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql/distsqlrun" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -124,11 +125,12 @@ func createBenchmarkChangefeed( encoder := makeJSONEncoder(details.Opts) sink := makeBenchSink() + metrics := MakeMetrics(server.DefaultHistogramWindowInterval).(*Metrics) buf := makeBuffer() leaseMgr := s.LeaseManager().(*sql.LeaseManager) poller := makePoller( - s.ClusterSettings(), s.DB(), feedClock, s.Gossip(), - spans, details, initialHighWater, buf, leaseMgr, + s.ClusterSettings(), s.DB(), feedClock, s.Gossip(), spans, details, initialHighWater, buf, + leaseMgr, metrics, ) th := makeTableHistory(func(context.Context, *sqlbase.TableDescriptor) error { return nil }, initialHighWater) @@ -139,7 +141,8 @@ func createBenchmarkChangefeed( m: th, } rowsFn := kvsToRows(s.LeaseManager().(*sql.LeaseManager), details, buf.Get) - tickFn := emitEntries(s.ClusterSettings(), details, encoder, sink, rowsFn, TestingKnobs{}) + tickFn := emitEntries( + s.ClusterSettings(), details, encoder, sink, rowsFn, TestingKnobs{}, metrics) ctx, cancel := context.WithCancel(ctx) go func() { _ = poller.Run(ctx) }() diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 78f6231d9854..7b57a7f8b638 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -11,6 +11,7 @@ package changefeedccl import ( "context" "math" + "time" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -81,21 +82,43 @@ var ( Measurement: "Bytes", Unit: metric.Unit_BYTES, } - metaChangefeedEmitNanos = metric.Metadata{ - Name: "changefeed.emit_nanos", - Help: "Total time spent emitting all feeds", - Measurement: "Nanoseconds", - Unit: metric.Unit_NANOSECONDS, - } - // This is more naturally a histogram but that creates a lot of timeseries - // and it's not clear that the additional fidelity is worth it. Revisit if - // evidence suggests otherwise. metaChangefeedFlushes = metric.Metadata{ Name: "changefeed.flushes", Help: "Total flushes across all feeds", Measurement: "Flushes", Unit: metric.Unit_COUNT, } + metaChangefeedSinkErrorRetries = metric.Metadata{ + Name: "changefeed.sink_error_retries", + Help: "Total retryable errors encountered while emitting to sinks", + Measurement: "Errors", + Unit: metric.Unit_COUNT, + } + + metaChangefeedPollRequestNanos = metric.Metadata{ + Name: "changefeed.poll_request_nanos", + Help: "Time spent fetching changes", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaChangefeedProcessingNanos = metric.Metadata{ + Name: "changefeed.processing_nanos", + Help: "Time spent processing KV changes into SQL rows", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaChangefeedTableMetadataNanos = metric.Metadata{ + Name: "changefeed.table_metadata_nanos", + Help: "Time blocked while verifying table metadata histories", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaChangefeedEmitNanos = metric.Metadata{ + Name: "changefeed.emit_nanos", + Help: "Total time spent emitting all feeds", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } metaChangefeedFlushNanos = metric.Metadata{ Name: "changefeed.flush_nanos", Help: "Total time spent flushing all feeds", @@ -113,25 +136,25 @@ var ( Measurement: "Nanoseconds", Unit: metric.Unit_TIMESTAMP_NS, } - metaChangefeedSinkErrorRetries = metric.Metadata{ - Name: "changefeed.sink_error_retries", - Help: "Total retryable errors encountered while emitting to sinks", - Measurement: "Errors", - Unit: metric.Unit_COUNT, - } ) const noMinHighWaterSentinel = int64(math.MaxInt64) +const pollRequestNanosHistMaxLatency = time.Hour + // Metrics are for production monitoring of changefeeds. type Metrics struct { EmittedMessages *metric.Counter EmittedBytes *metric.Counter - EmitNanos *metric.Counter Flushes *metric.Counter - FlushNanos *metric.Counter SinkErrorRetries *metric.Counter + PollRequestNanosHist *metric.Histogram + ProcessingNanos *metric.Counter + TableMetadataNanos *metric.Counter + EmitNanos *metric.Counter + FlushNanos *metric.Counter + mu struct { syncutil.Mutex id int @@ -144,14 +167,33 @@ type Metrics struct { func (*Metrics) MetricStruct() {} // MakeMetrics makes the metrics for changefeed monitoring. -func MakeMetrics() metric.Struct { +func MakeMetrics(histogramWindow time.Duration) metric.Struct { m := &Metrics{ EmittedMessages: metric.NewCounter(metaChangefeedEmittedMessages), EmittedBytes: metric.NewCounter(metaChangefeedEmittedBytes), - EmitNanos: metric.NewCounter(metaChangefeedEmitNanos), Flushes: metric.NewCounter(metaChangefeedFlushes), - FlushNanos: metric.NewCounter(metaChangefeedFlushNanos), SinkErrorRetries: metric.NewCounter(metaChangefeedSinkErrorRetries), + + // Metrics for changefeed performance debugging: - PollRequestNanos and + // PollRequestNanosHist, things are first + // fetched with some limited concurrency. We're interested in both the + // total amount of time fetching as well as outliers, so we need both + // the counter and the histogram. + // - N/A. Each change is put into a buffer. Right now nothing measures + // this since the buffer doesn't actually buffer and so it just tracks + // the poll sleep time. + // - ProcessingNanos. Everything from the buffer until the SQL row is + // about to be emitted. This includes TableMetadataNanos, which is + // dependent on network calls, so also tracked in case it's ever the + // cause of a ProcessingNanos blowup. + // - EmitNanos and FlushNanos. All of our interactions with the sink. + PollRequestNanosHist: metric.NewHistogram( + metaChangefeedPollRequestNanos, histogramWindow, + pollRequestNanosHistMaxLatency.Nanoseconds(), 1), + ProcessingNanos: metric.NewCounter(metaChangefeedProcessingNanos), + TableMetadataNanos: metric.NewCounter(metaChangefeedTableMetadataNanos), + EmitNanos: metric.NewCounter(metaChangefeedEmitNanos), + FlushNanos: metric.NewCounter(metaChangefeedFlushNanos), } m.mu.resolved = make(map[int]hlc.Timestamp) m.MinHighWater = metric.NewFunctionalGauge(metaChangefeedMinHighWater, func() int64 { diff --git a/pkg/ccl/changefeedccl/poller.go b/pkg/ccl/changefeedccl/poller.go index cf8d8b2201f5..852cc2accdae 100644 --- a/pkg/ccl/changefeedccl/poller.go +++ b/pkg/ccl/changefeedccl/poller.go @@ -52,6 +52,7 @@ type poller struct { buf *buffer tableHist *tableHistory leaseMgr *sql.LeaseManager + metrics *Metrics mu struct { syncutil.Mutex @@ -82,6 +83,7 @@ func makePoller( highWater hlc.Timestamp, buf *buffer, leaseMgr *sql.LeaseManager, + metrics *Metrics, ) *poller { p := &poller{ settings: settings, @@ -93,6 +95,7 @@ func makePoller( details: details, buf: buf, leaseMgr: leaseMgr, + metrics: metrics, } p.mu.previousTableVersion = make(map[sqlbase.ID]*sqlbase.TableDescriptor) // If no highWater is specified, set the highwater to the statement time @@ -137,10 +140,12 @@ func (p *poller) Run(ctx context.Context) error { nextHighWater := p.clock.Now() + tableMetadataStart := timeutil.Now() // Ingest table descriptors up to the next prospective highwater. if err := p.updateTableHistory(ctx, nextHighWater); err != nil { return err } + p.metrics.TableMetadataNanos.Inc(timeutil.Since(tableMetadataStart).Nanoseconds()) // Determine if we are at a scanBoundary, and trigger a full scan if needed. isFullScan := false @@ -381,8 +386,6 @@ func getSpansToProcess( func (p *poller) exportSpansParallel( ctx context.Context, spans []roachpb.Span, start, end hlc.Timestamp, isFullScan bool, ) error { - sender := p.db.NonTransactionalSender() - // Export requests for the various watched spans are executed in parallel, // with a semaphore-enforced limit based on a cluster setting. maxConcurrentExports := clusterNodeCount(p.gossip) * @@ -396,7 +399,7 @@ func (p *poller) exportSpansParallel( for _, span := range spans { span := span - // Wait for our sempahore. + // Wait for our semaphore. select { case <-ctx.Done(): return ctx.Err() @@ -405,56 +408,30 @@ func (p *poller) exportSpansParallel( g.GoCtx(func(ctx context.Context) error { defer func() { <-exportsSem }() - if log.V(2) { - log.Infof(ctx, `sending ExportRequest [%s,%s)`, span.Key, span.EndKey) - } - stopwatchStart := timeutil.Now() - exported, pErr := exportSpan(ctx, span, sender, start, end, isFullScan) + err := p.exportSpan(ctx, span, start, end, isFullScan) finished := atomic.AddInt64(&atomicFinished, 1) if log.V(2) { - log.Infof(ctx, `finished ExportRequest [%s,%s) %d of %d took %s`, - span.Key, span.EndKey, finished, len(spans), timeutil.Since(stopwatchStart)) + log.Infof(ctx, `exported %d of %d`, finished, len(spans)) } - if pErr != nil { - return errors.Wrapf( - pErr.GoError(), `fetching changes for [%s,%s)`, span.Key, span.EndKey, - ) - } - - // When outputting a full scan, we want to use the schema at the scan - // timestamp, not the schema at the value timestamp. - var schemaTimestamp hlc.Timestamp - if isFullScan { - schemaTimestamp = end - } - stopwatchStart = timeutil.Now() - for _, file := range exported.(*roachpb.ExportResponse).Files { - if err := p.slurpSST(ctx, file.SST, schemaTimestamp); err != nil { - return err - } - } - if err := p.buf.AddResolved(ctx, span, end); err != nil { + if err != nil { return err } - - if log.V(2) { - log.Infof(ctx, `finished buffering [%s,%s) took %s`, - span.Key, span.EndKey, timeutil.Since(stopwatchStart)) - } return nil }) } return g.Wait() } -func exportSpan( - ctx context.Context, - span roachpb.Span, - sender client.Sender, - start, end hlc.Timestamp, - fullScan bool, -) (roachpb.Response, *roachpb.Error) { +func (p *poller) exportSpan( + ctx context.Context, span roachpb.Span, start, end hlc.Timestamp, isFullScan bool, +) error { + sender := p.db.NonTransactionalSender() + if log.V(2) { + log.Infof(ctx, `sending ExportRequest [%s,%s) over (%s,%s]`, + span.Key, span.EndKey, start, end) + } + header := roachpb.Header{Timestamp: end} req := &roachpb.ExportRequest{ RequestHeader: roachpb.RequestHeaderFromSpan(span), @@ -463,11 +440,52 @@ func exportSpan( ReturnSST: true, OmitChecksum: true, } - if fullScan { + if isFullScan { req.MVCCFilter = roachpb.MVCCFilter_Latest req.StartTime = hlc.Timestamp{} } - return client.SendWrappedWith(ctx, sender, header, req) + + stopwatchStart := timeutil.Now() + exported, pErr := client.SendWrappedWith(ctx, sender, header, req) + exportDuration := timeutil.Since(stopwatchStart) + if log.V(2) { + log.Infof(ctx, `finished ExportRequest [%s,%s) over (%s,%s] took %s`, + span.Key, span.EndKey, start, end, exportDuration) + } + slowExportThreshold := 10 * changefeedPollInterval.Get(&p.settings.SV) + if exportDuration > slowExportThreshold { + log.Infof(ctx, "finished ExportRequest [%s,%s) over (%s,%s] took %s behind by %s", + span.Key, span.EndKey, start, end, exportDuration, timeutil.Since(end.GoTime())) + } + + if pErr != nil { + return errors.Wrapf( + pErr.GoError(), `fetching changes for [%s,%s)`, span.Key, span.EndKey, + ) + } + p.metrics.PollRequestNanosHist.RecordValue(exportDuration.Nanoseconds()) + + // When outputting a full scan, we want to use the schema at the scan + // timestamp, not the schema at the value timestamp. + var schemaTimestamp hlc.Timestamp + if isFullScan { + schemaTimestamp = end + } + stopwatchStart = timeutil.Now() + for _, file := range exported.(*roachpb.ExportResponse).Files { + if err := p.slurpSST(ctx, file.SST, schemaTimestamp); err != nil { + return err + } + } + if err := p.buf.AddResolved(ctx, span, end); err != nil { + return err + } + + if log.V(2) { + log.Infof(ctx, `finished buffering [%s,%s) took %s`, + span.Key, span.EndKey, timeutil.Since(stopwatchStart)) + } + return nil } func (p *poller) updateTableHistory(ctx context.Context, endTS hlc.Timestamp) error { diff --git a/pkg/ccl/changefeedccl/span_frontier.go b/pkg/ccl/changefeedccl/span_frontier.go index 646c34634412..3874894ba567 100644 --- a/pkg/ccl/changefeedccl/span_frontier.go +++ b/pkg/ccl/changefeedccl/span_frontier.go @@ -130,6 +130,13 @@ func (s *spanFrontier) Frontier() hlc.Timestamp { return s.minHeap[0].ts } +func (s *spanFrontier) peekFrontierSpan() roachpb.Span { + if s.minHeap.Len() == 0 { + return roachpb.Span{} + } + return s.minHeap[0].span +} + // Forward advances the timestamp for a span. Any part of the span that doesn't // overlap the tracked span set will be ignored. True is returned if the // frontier advanced as a result. diff --git a/pkg/jobs/metrics.go b/pkg/jobs/metrics.go index cbf2162db13a..2394a70aaf06 100644 --- a/pkg/jobs/metrics.go +++ b/pkg/jobs/metrics.go @@ -14,7 +14,11 @@ package jobs -import "github.com/cockroachdb/cockroach/pkg/util/metric" +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/util/metric" +) // Metrics are for production monitoring of each job type. type Metrics struct { @@ -25,12 +29,12 @@ type Metrics struct { func (Metrics) MetricStruct() {} // InitHooks initializes the metrics for job monitoring. -func (m *Metrics) InitHooks() { +func (m *Metrics) InitHooks(histogramWindowInterval time.Duration) { if MakeChangefeedMetricsHook != nil { - m.Changefeed = MakeChangefeedMetricsHook() + m.Changefeed = MakeChangefeedMetricsHook(histogramWindowInterval) } } // MakeChangefeedMetricsHook allows for registration of changefeed metrics from // ccl code. -var MakeChangefeedMetricsHook func() metric.Struct +var MakeChangefeedMetricsHook func(time.Duration) metric.Struct diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index f57e19eb0a0d..aad4d0f5471a 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -138,6 +138,7 @@ func MakeRegistry( ex sqlutil.InternalExecutor, nodeID *base.NodeIDContainer, settings *cluster.Settings, + histogramWindowInterval time.Duration, planFn planHookMaker, ) *Registry { r := &Registry{ @@ -152,7 +153,7 @@ func MakeRegistry( } r.mu.epoch = 1 r.mu.jobs = make(map[int64]context.CancelFunc) - r.metrics.InitHooks() + r.metrics.InitHooks(histogramWindowInterval) return r } diff --git a/pkg/jobs/registry_external_test.go b/pkg/jobs/registry_external_test.go index 4f65d0cc3633..1f2d4366b085 100644 --- a/pkg/jobs/registry_external_test.go +++ b/pkg/jobs/registry_external_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" @@ -91,7 +92,7 @@ func TestRegistryResumeExpiredLease(t *testing.T) { nodeID.Reset(id) r := jobs.MakeRegistry( ac, s.Stopper(), clock, db, s.InternalExecutor().(sqlutil.InternalExecutor), - nodeID, s.ClusterSettings(), jobs.FakePHS, + nodeID, s.ClusterSettings(), server.DefaultHistogramWindowInterval, jobs.FakePHS, ) if err := r.Start(ctx, s.Stopper(), nodeLiveness, cancelInterval, adoptInterval); err != nil { t.Fatal(err) diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go index 0688fbd0d42d..390632a82b0d 100644 --- a/pkg/jobs/registry_test.go +++ b/pkg/jobs/registry_test.go @@ -39,13 +39,17 @@ func TestRegistryCancelation(t *testing.T) { ctx, stopper := context.Background(), stop.NewStopper() defer stopper.Stop(ctx) + // Not using the server.DefaultHistogramWindowInterval constant because + // of a dep cycle. + const histogramWindowInterval = 60 * time.Second + var db *client.DB // Insulate this test from wall time. mClock := hlc.NewManualClock(hlc.UnixNano()) clock := hlc.NewClock(mClock.UnixNano, time.Nanosecond) registry := MakeRegistry( log.AmbientContext{}, stopper, clock, db, nil /* ex */, FakeNodeID, cluster.NoSettings, - FakePHS) + histogramWindowInterval, FakePHS) const nodeCount = 1 nodeLiveness := NewFakeNodeLiveness(nodeCount) diff --git a/pkg/server/config.go b/pkg/server/config.go index 030cdb4573fe..c1acad98bce5 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -57,8 +57,9 @@ const ( defaultScanMaxIdleTime = 1 * time.Second // NB: this can't easily become a variable as the UI hard-codes it to 10s. // See https://github.com/cockroachdb/cockroach/issues/20310. - DefaultMetricsSampleInterval = 10 * time.Second - defaultStorePath = "cockroach-data" + DefaultMetricsSampleInterval = 10 * time.Second + DefaultHistogramWindowInterval = 6 * DefaultMetricsSampleInterval + defaultStorePath = "cockroach-data" // TempDirPrefix is the filename prefix of any temporary subdirectory // created. TempDirPrefix = "cockroach-temp" @@ -276,7 +277,7 @@ type Config struct { // metrics. For more information on the issues underlying our histogram system // and the proposed fixes, please see issue #7896. func (cfg Config) HistogramWindowInterval() time.Duration { - hwi := DefaultMetricsSampleInterval * 6 + hwi := DefaultHistogramWindowInterval // Rudimentary overflow detection; this can result if // DefaultMetricsSampleInterval is set to an extremely large number, likely diff --git a/pkg/server/server.go b/pkg/server/server.go index 2a2cacc2596c..c428a7371744 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -492,6 +492,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { internalExecutor, &s.nodeIDContainer, st, + s.cfg.HistogramWindowInterval(), func(opName, user string) (interface{}, func()) { // This is a hack to get around a Go package dependency cycle. See comment // in sql/jobs/registry.go on planHookMaker.