Skip to content

Commit

Permalink
pkg/sql: export sql.aggregated_livebytes metric for out-of-process te…
Browse files Browse the repository at this point in the history
…nants

Previously, in order to obtain livebytes metrics for tenants, one would need
to query such values via the KV servers, and this can be problematic if we
only have access to just the SQL servers. For example, in CockroachDB Cloud,
only metrics from the SQL servers are exported to end-users, and is done so
directly from the cockroachdb process. It is not trivial to export an
additional subset of metrics from the KV servers filtered by tenant ID.

To address that, this commit exposes livebytes for tenants directly via an
aggregated metric on the SQL nodes. The aggregated metric will be updated
every 60 seconds by default, and will be exported via the existing MVCC
statistics update job. Unlike other job metrics where metrics are registered
at initialization time and stays forever, this aggregated metric is tied to
the lifespan of the job (i.e. it is only exported if the job is running, and
unexported otherwise).

This feature is scoped to standalone SQL servers only, which at this point of
writing, is only supported in CockroachDB Cloud. If we wanted to backport this
into 23.2, it should be straightforward as well since the permanent upgrade
to insert the job is already in release-23.2.

Fixes: #119139

Epic: none

Release note (sql change): Out-of-process SQL servers will start exporting a
new sql.aggregated_livebytes metric. This metric gets updated once every 60
seconds by default, and its update interval can be configured via the
`tenant_global_metrics_exporter_interval` cluster setting.
  • Loading branch information
jaylim-crl committed Feb 13, 2024
1 parent 25f20a9 commit 7260245
Show file tree
Hide file tree
Showing 7 changed files with 425 additions and 13 deletions.
2 changes: 2 additions & 0 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ const (
// SqlActivityUpdaterJobID A static job ID is used for the SQL activity tables.
SqlActivityUpdaterJobID = jobspb.JobID(103)

// MVCCStatisticsJobID A static job ID used for the MVCC statistics update
// job.
MVCCStatisticsJobID = jobspb.JobID(104)
)

Expand Down
5 changes: 5 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2087,3 +2087,8 @@ func (s *SQLServer) ExecutorConfig() *sql.ExecutorConfig {
func (s *SQLServer) InternalExecutor() isql.Executor {
return s.internalExecutor
}

// MetricsRegistry returns the application-level metrics registry.
func (s *SQLServer) MetricsRegistry() *metric.Registry {
return s.metricsRegistry
}
7 changes: 7 additions & 0 deletions pkg/server/status/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,13 @@ func (mr *MetricsRecorder) RemoveTenantRegistry(tenantID roachpb.TenantID) {
delete(mr.mu.tenantRegistries, tenantID)
}

// AppRegistry returns the metric registry for application-level metrics.
func (mr *MetricsRecorder) AppRegistry() *metric.Registry {
mr.mu.Lock()
defer mr.mu.Unlock()
return mr.mu.appRegistry
}

// AddNode adds various metric registries an initialized server, along
// with its descriptor and start time.
// The registries are:
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ go_test(
"multitenant_admin_function_test.go",
"mutation_test.go",
"mvcc_backfiller_test.go",
"mvcc_statistics_update_job_test.go",
"normalization_test.go",
"pg_metadata_test.go",
"pg_oid_test.go",
Expand Down Expand Up @@ -788,6 +789,7 @@ go_test(
"//pkg/server/serverpb",
"//pkg/server/settingswatcher",
"//pkg/server/srvtestutils",
"//pkg/server/status",
"//pkg/server/status/statuspb",
"//pkg/server/telemetry",
"//pkg/settings",
Expand Down Expand Up @@ -884,6 +886,7 @@ go_test(
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/ts",
"//pkg/upgrade/upgradebase",
"//pkg/util",
"//pkg/util/admission",
Expand Down Expand Up @@ -936,6 +939,7 @@ go_test(
"@com_github_lib_pq//oid",
"@com_github_petermattis_goid//:goid",
"@com_github_pmezard_go_difflib//difflib",
"@com_github_prometheus_common//expfmt",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@in_gopkg_yaml_v2//:yaml_v2",
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,10 +1177,11 @@ type NodeInfo struct {
PGURL func(*url.Userinfo) (*pgurl.URL, error)
}

// nodeStatusGenerator is a limited portion of the status.MetricsRecorder
// limitedMetricsRecorder is a limited portion of the status.MetricsRecorder
// struct, to avoid having to import all of status in sql.
type nodeStatusGenerator interface {
type limitedMetricsRecorder interface {
GenerateNodeStatus(ctx context.Context) *statuspb.NodeStatus
AppRegistry() *metric.Registry
}

// SystemTenantOnly wraps an object in the ExecutorConfig that is only
Expand Down Expand Up @@ -1249,7 +1250,7 @@ type ExecutorConfig struct {
// available when not running as a system tenant.
SQLStatusServer serverpb.SQLStatusServer
TenantStatusServer serverpb.TenantStatusServer
MetricsRecorder nodeStatusGenerator
MetricsRecorder limitedMetricsRecorder
SessionRegistry *SessionRegistry
ClosedSessionCache *ClosedSessionCache
SQLLiveness sqlliveness.Provider
Expand Down
130 changes: 120 additions & 10 deletions pkg/sql/mvcc_statistics_update_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,141 @@ package sql

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

// TenantGlobalMetricsExporterInterval is the interval at which an external
// tenant's process in the cluster will update the global metrics. This is
// exported for testing purposes.
var TenantGlobalMetricsExporterInterval = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"tenant_global_metrics_exporter_interval",
"the interval at which a node in the cluster will update the exported global metrics",
60*time.Second,
settings.PositiveDuration,
)

// mvccStatisticsUpdateJob is a singleton job that is meant to update MVCC
// statistics. Historically, this was added to update system.mvcc_statistics,
// but the project was deprioritized. Currently, this is used by external
// process tenants to export global metrics periodically. For such metrics,
// they will only be present on a SQL node if the job is running. Once the job
// stops, the metrics will be removed from the metric registry.
type mvccStatisticsUpdateJob struct {
job *jobs.Job
st *cluster.Settings

// dynamicMetrics keep track of metrics which are added/removed dynamically
// as the job runs. Unlike regular job metrics (i.e. WithJobMetrics), which
// are registered when the job starts the first time, and never removed from
// the metric registry, metrics in this list should be removed when the job
// is not running.
dynamicMetrics struct {
livebytes *metric.Gauge
}
}

var _ jobs.Resumer = (*mvccStatisticsUpdateJob)(nil)

func (j *mvccStatisticsUpdateJob) Resume(ctx context.Context, execCtxI interface{}) (jobErr error) {
// Resume implements the jobs.Resumer interface.
func (j *mvccStatisticsUpdateJob) Resume(ctx context.Context, execCtxI interface{}) error {
log.Infof(ctx, "starting mvcc statistics update job")

// This job is a forever running background job, and it is always safe to
// terminate the SQL pod whenever the job is running, so mark it as idle.
j.job.MarkIdle(true)

execCtx := execCtxI.(JobExecContext)

// Export global metrics for tenants if this is an out-of-process SQL node.
// All external mode tenant servers have no node IDs.
if _, hasNodeID := execCtx.ExecCfg().NodeInfo.NodeID.OptionalNodeID(); !hasNodeID {
return j.runTenantGlobalMetricsExporter(ctx, execCtx)
}

// TODO(zachlite):
// Delete samples older than configurable setting...
// Collect span stats for tenant descriptors...
// Write new samples...
execCtx := execCtxI.(JobExecContext)
stopper := execCtx.ExecCfg().DistSQLSrv.Stopper
j.job.MarkIdle(true)

// Block until context is cancelled since there's nothing that needs to be
// done here. We should not return nil, or else the job will be marked as
// succeeded.
<-ctx.Done()
return ctx.Err()
}

// runTenantGlobalMetricsExporter executes the logic to export global metrics
// for tenants.
func (j *mvccStatisticsUpdateJob) runTenantGlobalMetricsExporter(
ctx context.Context, execCtx JobExecContext,
) error {
metricsRegistry := execCtx.ExecCfg().MetricsRecorder.AppRegistry()

initialRun := true
defer func() {
metricsRegistry.RemoveMetric(j.dynamicMetrics.livebytes)
}()

runTask := func() error {
resp, err := execCtx.ExecCfg().TenantStatusServer.SpanStats(
ctx,
&roachpb.SpanStatsRequest{
// Fan out to all nodes. SpanStats takes care of only contacting
// the relevant nodes with the tenant's span.
NodeID: "0",
Spans: []roachpb.Span{execCtx.ExecCfg().Codec.TenantSpan()},
},
)
if err != nil {
return err
}
var total int64
for _, stats := range resp.SpanToStats {
total += stats.ApproximateTotalStats.LiveBytes
}
j.dynamicMetrics.livebytes.Update(total)

// Only register metrics once we get our initial values. This avoids
// metrics from fluctuating whenever the job restarts.
if initialRun {
metricsRegistry.AddMetric(j.dynamicMetrics.livebytes)
initialRun = false
}
return nil
}

timer := timeutil.NewTimer()
defer timer.Stop()

// Fire the timer immediately to start the initial update.
timer.Reset(0)

for {
select {
case <-ctx.Done():
return nil

case <-stopper.ShouldQuiesce():
return nil
return ctx.Err()
case <-timer.C:
timer.Read = true
timer.Reset(TenantGlobalMetricsExporterInterval.Get(&execCtx.ExecCfg().Settings.SV))
if err := runTask(); err != nil {
log.Errorf(ctx, "mvcc statistics update job error: %v", err)
}
}
}
}

// OnFailOrCancel implements the jobs.Resumer interface.
func (j *mvccStatisticsUpdateJob) OnFailOrCancel(
ctx context.Context, _ interface{}, jobErr error,
) error {
Expand All @@ -63,14 +164,23 @@ func (j *mvccStatisticsUpdateJob) OnFailOrCancel(
return nil
}

// CollectProfile implements the jobs.Resumer interface.
func (j *mvccStatisticsUpdateJob) CollectProfile(_ context.Context, _ interface{}) error {
return nil
}

func init() {
jobs.RegisterConstructor(jobspb.TypeMVCCStatisticsUpdate,
jobs.RegisterConstructor(
jobspb.TypeMVCCStatisticsUpdate,
func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return &mvccStatisticsUpdateJob{job: job}
exporter := &mvccStatisticsUpdateJob{job: job, st: settings}
exporter.dynamicMetrics.livebytes = metric.NewGauge(metric.Metadata{
Name: "sql.aggregated_livebytes",
Help: "Aggregated number of bytes of live data (keys plus values)",
Measurement: "Storage",
Unit: metric.Unit_BYTES,
})
return exporter
},
jobs.DisablesTenantCostControl,
)
Expand Down

0 comments on commit 7260245

Please sign in to comment.