Skip to content

Commit

Permalink
changefeedccl: create separate root changefeed memory monitor
Browse files Browse the repository at this point in the history
This commit introduces a "root" changefeed memory monitor that will be
the parent for all CDC DistSQL flows. The primary motivation behind
this change is to disconnect the CDC flow from the planner's monitor
(which is "txn" monitor of the connection that started the changefeed).
This is needed since that connection can be closed _before_ the CDC flow
is cleaned up which triggers the "short-living non-stopped monitor"
assertion. An additional benefit of this change is improved
observability since we can now easily track the memory usage across all
changefeeds.

Release note: None
  • Loading branch information
yuzefovich committed Jun 8, 2024
1 parent 4b0433b commit 68a3285
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 40 deletions.
11 changes: 7 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,15 @@ func startDistChangefeed(

jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)

// CDC DistSQL flows are long-living, so we want to mark the flow memory
// monitor accordingly.
planCtx.MarkFlowMonitorAsLongLiving = true
// Make sure to use special changefeed monitor going forward as the
// parent monitor for the DistSQL infrastructure. This is needed to
// prevent a race between the connection that started the changefeed
// closing (which closes the current planner's monitor) and changefeed
// DistSQL flow being cleaned up.
planCtx.OverridePlannerMon = execCfg.DistSQLSrv.ChangefeedMonitor
// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
// p is the physical plan, recv is the distsqlreceiver
// p is the physical plan, recv is the DistSQLReceiver.
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn)
return resultRows.Err()
}
Expand Down
11 changes: 1 addition & 10 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,6 @@ func newChangeAggregatorProcessor(
}()

memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, "changeagg-mem")
// CDC DistSQL flows are long-living, so we mark the memory monitors
// accordingly.
memMonitor.MarkLongLiving()
ca := &changeAggregator{
flowCtx: flowCtx,
spec: spec,
Expand Down Expand Up @@ -399,10 +396,7 @@ func (ca *changeAggregator) startKVFeed(
opts changefeedbase.StatementOptions,
) (kvevent.Reader, chan struct{}, chan error, error) {
cfg := ca.flowCtx.Cfg
// CDC DistSQL flows are long-living, so we mark the memory monitors
// accordingly.
const longLiving = true
kvFeedMemMon := mon.NewMonitorInheritWithLimit("kvFeed", memLimit, parentMemMon, longLiving)
kvFeedMemMon := mon.NewMonitorInheritWithLimit("kvFeed", memLimit, parentMemMon, false /* longLiving */)
kvFeedMemMon.StartNoReserved(ctx, parentMemMon)
buf := kvevent.NewThrottlingBuffer(
kvevent.NewMemBuffer(kvFeedMemMon.MakeBoundAccount(), &cfg.Settings.SV, &ca.metrics.KVFeedMetrics),
Expand Down Expand Up @@ -1122,9 +1116,6 @@ func newChangeFrontierProcessor(
post *execinfrapb.PostProcessSpec,
) (execinfra.Processor, error) {
memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, "changefntr-mem")
// CDC DistSQL flows are long-living, so we mark the memory monitors
// accordingly.
memMonitor.MarkLongLiving()
sf, err := makeSchemaChangeFrontier(hlc.Timestamp{}, spec.TrackedSpans...)
if err != nil {
return nil, err
Expand Down
36 changes: 36 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,42 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
return m
}

var (
metaMemMaxBytes = metric.Metadata{
Name: "sql.mem.changefeed.max",
Help: "Maximum memory usage across all changefeeds",
Measurement: "Memory",
Unit: metric.Unit_BYTES,
}
metaMemCurBytes = metric.Metadata{
Name: "sql.mem.changefeed.current",
Help: "Current memory usage across all changefeeds",
Measurement: "Memory",
Unit: metric.Unit_BYTES,
}
)

// See pkg/sql/mem_metrics.go
// log10int64times1000 = log10(math.MaxInt64) * 1000, rounded up somewhat
const log10int64times1000 = 19 * 1000

// MakeMemoryMetrics instantiates the metrics holder for memory monitors of
// changefeeds.
func MakeMemoryMetrics(
histogramWindow time.Duration,
) (curCount *metric.Gauge, maxHist metric.IHistogram) {
curCount = metric.NewGauge(metaMemCurBytes)
maxHist = metric.NewHistogram(metric.HistogramOptions{
Metadata: metaMemMaxBytes,
Duration: histogramWindow,
MaxVal: log10int64times1000,
SigFigs: 3,
BucketConfig: metric.MemoryUsage64MBBuckets,
})
return curCount, maxHist
}

func init() {
jobs.MakeChangefeedMetricsHook = MakeMetrics
jobs.MakeChangefeedMemoryMetricsHook = MakeMemoryMetrics
}
4 changes: 4 additions & 0 deletions pkg/jobs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ func (m *Metrics) init(histogramWindowInterval time.Duration) {
// ccl code.
var MakeChangefeedMetricsHook func(time.Duration) metric.Struct

// MakeChangefeedMemoryMetricsHook allows for registration of changefeed memory
// metrics from ccl code.
var MakeChangefeedMemoryMetricsHook func(time.Duration) (curCount *metric.Gauge, maxHist metric.IHistogram)

// MakeStreamIngestMetricsHook allows for registration of streaming metrics from
// ccl code.
var MakeStreamIngestMetricsHook func(duration time.Duration) metric.Struct
Expand Down
14 changes: 12 additions & 2 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,17 +696,26 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
bulkMetrics := bulk.MakeBulkMetrics(cfg.HistogramWindowInterval())
cfg.registry.AddMetricStruct(bulkMetrics)
bulkMemoryMonitor.SetMetrics(bulkMetrics.CurBytesCount, bulkMetrics.MaxBytesHist)
bulkMemoryMonitor.StartNoReserved(context.Background(), rootSQLMemoryMonitor)
bulkMemoryMonitor.StartNoReserved(ctx, rootSQLMemoryMonitor)

backfillMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backfill-mon")
backfillMemoryMonitor.MarkLongLiving()
backupMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backup-mon")
backupMemoryMonitor.MarkLongLiving()

changefeedMemoryMonitor := mon.NewMonitorInheritWithLimit(
"changefeed-mon", 0 /* limit */, rootSQLMemoryMonitor, true, /* longLiving */
)
if jobs.MakeChangefeedMemoryMetricsHook != nil {
changefeedCurCount, changefeedMaxHist := jobs.MakeChangefeedMemoryMetricsHook(cfg.HistogramWindowInterval())
changefeedMemoryMonitor.SetMetrics(changefeedCurCount, changefeedMaxHist)
}
changefeedMemoryMonitor.StartNoReserved(ctx, rootSQLMemoryMonitor)

serverCacheMemoryMonitor := mon.NewMonitorInheritWithLimit(
"server-cache-mon", 0 /* limit */, rootSQLMemoryMonitor, true, /* longLiving */
)
serverCacheMemoryMonitor.StartNoReserved(context.Background(), rootSQLMemoryMonitor)
serverCacheMemoryMonitor.StartNoReserved(ctx, rootSQLMemoryMonitor)

// Set up the DistSQL temp engine.

Expand Down Expand Up @@ -821,6 +830,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
ParentDiskMonitor: cfg.TempStorageConfig.Mon,
BackfillerMonitor: backfillMemoryMonitor,
BackupMonitor: backupMemoryMonitor,
ChangefeedMonitor: changefeedMemoryMonitor,
BulkSenderLimiter: bulkSenderLimiter,

ParentMemoryMonitor: rootSQLMemoryMonitor,
Expand Down
27 changes: 9 additions & 18 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,12 @@ func (ds *ServerImpl) setDraining(drain bool) error {

// setupFlow creates a Flow.
//
// Args:
// reserved: Specifies the upfront memory reservation that the flow takes
// - reserved: specifies the upfront memory reservation that the flow takes
// ownership of. This account is already closed if an error is returned or
// will be closed through Flow.Cleanup.
//
// ownership of. This account is already closed if an error is returned or
// will be closed through Flow.Cleanup.
//
// localState: Specifies if the flow runs entirely on this node and, if it does,
//
// specifies the txn and other attributes.
// - localState: specifies if the flow runs entirely on this node and, if it
// does, specifies the txn and other attributes.
//
// Note: unless an error is returned, the returned context contains a span that
// must be finished through Flow.Cleanup.
Expand Down Expand Up @@ -263,11 +260,10 @@ func (ds *ServerImpl) setupFlow(
}

monitor = mon.NewMonitor(mon.Options{
Name: "flow " + redact.RedactableString(req.Flow.FlowID.Short()),
CurCount: ds.Metrics.CurBytesCount,
MaxHist: ds.Metrics.MaxBytesHist,
Settings: ds.Settings,
LongLiving: localState.MarkFlowMonitorAsLongLiving,
Name: "flow " + redact.RedactableString(req.Flow.FlowID.Short()),
CurCount: ds.Metrics.CurBytesCount,
MaxHist: ds.Metrics.MaxBytesHist,
Settings: ds.Settings,
})
monitor.Start(ctx, parentMonitor, reserved)
diskMonitor = execinfra.NewMonitor(ctx, ds.ParentDiskMonitor, "flow-disk-monitor")
Expand Down Expand Up @@ -560,11 +556,6 @@ type LocalState struct {
// mapping to coldata.Batch, use any to avoid injecting new
// dependencies.
LocalVectorSources map[int32]any

// MarkFlowMonitorAsLongLiving, if set, instructs the DistSQL runner to mark
// the "flow" memory monitor as long-living one, thus exempting it from
// having to be stopped when the txn monitor is stopped.
MarkFlowMonitorAsLongLiving bool
}

// MustUseLeafTxn returns true if a LeafTxn must be used. It is valid to call
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/intsets"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -919,10 +920,9 @@ type PlanningCtx struct {
// This is true if plan is a simple insert that can be vectorized.
isVectorInsert bool

// MarkFlowMonitorAsLongLiving, if set, instructs the DistSQL runner to mark
// the "flow" memory monitor as long-living one, thus exempting it from
// having to be stopped when the txn monitor is stopped.
MarkFlowMonitorAsLongLiving bool
// OverridePlannerMon, if set, will be used instead of the Planner.Mon() as
// the parent monitor for the DistSQL flow.
OverridePlannerMon *mon.BytesMonitor
}

var _ physicalplan.ExprContext = &PlanningCtx{}
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,11 @@ func (dsp *DistSQLPlanner) setupFlows(
batchReceiver = recv
}
origCtx := ctx
ctx, flow, opChains, err := dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Planner.Mon(), &setupReq, recv, batchReceiver, localState)
parentMonitor := evalCtx.Planner.Mon()
if planCtx.OverridePlannerMon != nil {
parentMonitor = planCtx.OverridePlannerMon
}
ctx, flow, opChains, err := dsp.distSQLSrv.SetupLocalSyncFlow(ctx, parentMonitor, &setupReq, recv, batchReceiver, localState)
if err == nil && planCtx.saveFlows != nil {
err = planCtx.saveFlows(flows, opChains, planCtx.infra.LocalProcessors, isVectorized)
}
Expand Down Expand Up @@ -696,7 +700,6 @@ func (dsp *DistSQLPlanner) Run(
localState.Txn = txn
localState.LocalProcs = plan.LocalProcessors
localState.LocalVectorSources = plan.LocalVectorSources
localState.MarkFlowMonitorAsLongLiving = planCtx.MarkFlowMonitorAsLongLiving
if planCtx.planner != nil {
// Note that the planner's collection will only be used for local plans.
localState.Collection = planCtx.planner.Descriptors()
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type ServerConfig struct {
// used during restore.
RestoreMonitor *mon.BytesMonitor

// ChangefeedMonitor is the parent monitor for all CDC DistSQL flows.
ChangefeedMonitor *mon.BytesMonitor

// BulkSenderLimiter is the concurrency limiter that is shared across all of
// the processes in a given sql server when sending bulk ingest (AddSST) reqs.
BulkSenderLimiter limit.ConcurrentRequestLimiter
Expand Down

0 comments on commit 68a3285

Please sign in to comment.