Skip to content

Commit

Permalink
sql: introduce persistedsqlstats subsystem
Browse files Browse the repository at this point in the history
This commit introduces a new persisted sql stats subsystem
that wraps the existing node-local sql stats subsystem.
This new subsystem is responsible for flushing the in-meory
statistics into the system table periodically, or when it
detects memory pressure.
This replaces sql.Server's in-memory sqlStats provider.

Release note (sql change): SQL stats now can be persisted into
system.statement_statistics and system.transaction_statistics
tables by enabling the sql.stats.flush.enable cluster setting.
The interval of persistence is determined by the new
sql.stats.flush.interval cluster setting which defaults to 1 hour.
  • Loading branch information
Azhng committed Aug 6, 2021
1 parent 867985f commit 9104fd1
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Expand Up @@ -129,6 +129,7 @@ sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enable
sql.stats.automatic_collection.enabled boolean true automatic statistics collection mode
sql.stats.automatic_collection.fraction_stale_rows float 0.2 target fraction of stale rows per table that will trigger a statistics refresh
sql.stats.automatic_collection.min_stale_rows integer 500 target minimum number of stale rows per table that will trigger a statistics refresh
sql.stats.flush.enabled boolean true if set, SQL execution statistics are periodically flushed to disk
sql.stats.flush.interval duration 1h0m0s the interval at which SQL execution statistics are flushed to disk
sql.stats.histogram_collection.enabled boolean true histogram collection mode
sql.stats.multi_column_collection.enabled boolean true multi-column statistics collection mode
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Expand Up @@ -133,6 +133,7 @@
<tr><td><code>sql.stats.automatic_collection.enabled</code></td><td>boolean</td><td><code>true</code></td><td>automatic statistics collection mode</td></tr>
<tr><td><code>sql.stats.automatic_collection.fraction_stale_rows</code></td><td>float</td><td><code>0.2</code></td><td>target fraction of stale rows per table that will trigger a statistics refresh</td></tr>
<tr><td><code>sql.stats.automatic_collection.min_stale_rows</code></td><td>integer</td><td><code>500</code></td><td>target minimum number of stale rows per table that will trigger a statistics refresh</td></tr>
<tr><td><code>sql.stats.flush.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, SQL execution statistics are periodically flushed to disk</td></tr>
<tr><td><code>sql.stats.flush.interval</code></td><td>duration</td><td><code>1h0m0s</code></td><td>the interval at which SQL execution statistics are flushed to disk</td></tr>
<tr><td><code>sql.stats.histogram_collection.enabled</code></td><td>boolean</td><td><code>true</code></td><td>histogram collection mode</td></tr>
<tr><td><code>sql.stats.multi_column_collection.enabled</code></td><td>boolean</td><td><code>true</code></td><td>multi-column statistics collection mode</td></tr>
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/statement_statistics
@@ -1,5 +1,11 @@
# LogicTest: local !3node-tenant(52763)

# Disable SQL Stats flush to prevents stats from being cleared from the
# in-memory store.

statement ok
SET CLUSTER SETTING sql.stats.flush.enabled = false;

# Check that node_statement_statistics report per application

statement ok
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
Expand Up @@ -7,6 +7,7 @@ go_library(
"flush.go",
"provider.go",
"test_utils.go",
"writer.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats",
visibility = ["//visibility:public"],
Expand All @@ -17,11 +18,13 @@ go_library(
"//pkg/security",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/execstats",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlstats",
"//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil",
"//pkg/sql/sqlstats/sslocal",
"//pkg/sql/sqlstats/ssmemstorage",
"//pkg/sql/sqlutil",
"//pkg/util/log",
"//pkg/util/metric",
Expand Down
26 changes: 26 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/errors"
)

// SQLStatsFlushInterval is the cluster setting that controls how often the SQL
Expand All @@ -24,3 +25,28 @@ var SQLStatsFlushInterval = settings.RegisterDurationSetting(
time.Hour,
settings.NonNegativeDurationWithMaximum(time.Hour*24),
).WithPublic()

// SQLStatsFlushEnabled is the cluster setting that controls if the sqlstats
// subsystem persists the statistics into system table.
var SQLStatsFlushEnabled = settings.RegisterBoolSetting(
"sql.stats.flush.enabled",
"if set, SQL execution statistics are periodically flushed to disk",
true, /* defaultValue */
).WithPublic()

// SQLStatsFlushJitter specifies the jitter fraction on the interval between
// attempts to flush SQL Stats.
//
// [(1 - SQLStatsFlushJitter) * SQLStatsFlushInterval),
// (1 + SQLStatsFlushJitter) * SQLStatsFlushInterval)]
var SQLStatsFlushJitter = settings.RegisterFloatSetting(
"sql.stats.flush.jitter",
"jitter fraction on the duration between sql stats flushes",
0.15,
func(f float64) error {
if f < 0 || f > 1 {
return errors.Newf("%f is not in [0, 1]", f)
}
return nil
},
)
71 changes: 69 additions & 2 deletions pkg/sql/sqlstats/persistedsqlstats/provider.go
Expand Up @@ -15,6 +15,7 @@ package persistedsqlstats

import (
"context"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// TODO(azhng): currently we do not have the ability to compute a hash for
Expand Down Expand Up @@ -55,6 +57,12 @@ type PersistedSQLStats struct {

cfg *Config

// memoryPressureSignal is used by the persistedsqlstats.StatsWriter to signal
// memory pressure during stats recording. A signal is emitted through this
// channel either if the fingerprint limit or the memory limit has been
// exceeded.
memoryPressureSignal chan struct{}

lastFlushStarted time.Time
}

Expand All @@ -63,12 +71,71 @@ var _ sqlstats.Provider = &PersistedSQLStats{}
// New returns a new instance of the PersistedSQLStats.
func New(cfg *Config, memSQLStats *sslocal.SQLStats) *PersistedSQLStats {
return &PersistedSQLStats{
SQLStats: memSQLStats,
cfg: cfg,
SQLStats: memSQLStats,
cfg: cfg,
memoryPressureSignal: make(chan struct{}),
}
}

// Start implements sqlstats.Provider interface.
func (s *PersistedSQLStats) Start(ctx context.Context, stopper *stop.Stopper) {
s.SQLStats.Start(ctx, stopper)
s.startSQLStatsFlushLoop(ctx, stopper)
}

func (s *PersistedSQLStats) startSQLStatsFlushLoop(ctx context.Context, stopper *stop.Stopper) {
_ = stopper.RunAsyncTask(ctx, "sql-stats-worker", func(ctx context.Context) {
var resetIntervalChanged = make(chan struct{}, 1)

SQLStatsFlushInterval.SetOnChange(&s.cfg.Settings.SV, func(ctx context.Context) {
select {
case resetIntervalChanged <- struct{}{}:
default:
}
})

for timer := timeutil.NewTimer(); ; timer.Reset(s.nextFlushInterval()) {
select {
case <-timer.C:
timer.Read = true
case <-s.memoryPressureSignal:
// We are experiencing memory pressure, so we flush SQL stats to disk
// immediately, rather than waiting the full flush interval, in an
// attempt to relieve some of that pressure
case <-resetIntervalChanged:
// In this case, we would restart the loop without performing any flush
// and recalculate the flush interval in the for-loop's post statement.
continue
case <-stopper.ShouldQuiesce():
return
}

enabled := SQLStatsFlushEnabled.Get(&s.cfg.Settings.SV)
if enabled {
s.Flush(ctx)
}
}
})
}

// nextFlushInterval calculates the wait interval that is between:
// [(1 - SQLStatsFlushJitter) * SQLStatsFlushInterval),
// (1 + SQLStatsFlushJitter) * SQLStatsFlushInterval)]
func (s *PersistedSQLStats) nextFlushInterval() time.Duration {
baseInterval := SQLStatsFlushInterval.Get(&s.cfg.Settings.SV)

jitter := SQLStatsFlushJitter.Get(&s.cfg.Settings.SV)
frac := 1 + (2*rand.Float64()-1)*jitter

flushInterval := time.Duration(frac * float64(baseInterval.Nanoseconds()))
return flushInterval
}

// GetWriterForApplication implements sqlstats.Provider interface.
func (s *PersistedSQLStats) GetWriterForApplication(appName string) sqlstats.Writer {
writer := s.SQLStats.GetWriterForApplication(appName)
return &StatsWriter{
memWriter: writer,
memoryPressureSignal: s.memoryPressureSignal,
}
}
90 changes: 90 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/writer.go
@@ -0,0 +1,90 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package persistedsqlstats

import (
"context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/ssmemstorage"
"github.com/cockroachdb/errors"
)

// StatsWriter is a sqlstats.Writer that wraps a in-memory node-local stats
// writer. StatsWriter signals the subsystem when it encounters memory pressure
// which will triggers the flush operation.
type StatsWriter struct {
// local in-memory storage.
memWriter sqlstats.Writer

// Use to signal the stats writer is experiencing memory pressure.
memoryPressureSignal chan struct{}
}

var _ sqlstats.Writer = &StatsWriter{}

// RecordStatement implements sqlstats.Writer interface.
func (s *StatsWriter) RecordStatement(
ctx context.Context, key roachpb.StatementStatisticsKey, value sqlstats.RecordedStmtStats,
) (roachpb.StmtFingerprintID, error) {
var fingerprintID roachpb.StmtFingerprintID
err := s.recordStatsOrSendMemoryPressureSignal(func() (err error) {
fingerprintID, err = s.memWriter.RecordStatement(ctx, key, value)
return err
})
return fingerprintID, err
}

// RecordStatementExecStats implements sqlstats.Writer interface.
func (s *StatsWriter) RecordStatementExecStats(
key roachpb.StatementStatisticsKey, stats execstats.QueryLevelStats,
) error {
return s.memWriter.RecordStatementExecStats(key, stats)
}

// ShouldSaveLogicalPlanDesc implements sqlstats.Writer interface.
func (s *StatsWriter) ShouldSaveLogicalPlanDesc(
fingerprint string, implicitTxn bool, database string,
) bool {
return s.memWriter.ShouldSaveLogicalPlanDesc(fingerprint, implicitTxn, database)
}

// RecordTransaction implements sqlstats.Writer interface and saves
// per-transaction statistics.
func (s *StatsWriter) RecordTransaction(
ctx context.Context, key roachpb.TransactionFingerprintID, value sqlstats.RecordedTxnStats,
) error {
return s.recordStatsOrSendMemoryPressureSignal(func() error {
return s.memWriter.RecordTransaction(ctx, key, value)
})
}

func (s *StatsWriter) recordStatsOrSendMemoryPressureSignal(fn func() error) error {
err := fn()
if errors.Is(err, ssmemstorage.ErrFingerprintLimitReached) || errors.Is(err, ssmemstorage.ErrMemoryPressure) {
select {
case s.memoryPressureSignal <- struct{}{}:
// If we successfully signaled that we are experiencing memory pressure,
// then our job is done. However, if we fail to send the signal, that
// means we are already experiencing memory pressure and the
// stats-flush-worker has already started to handle the flushing. We
// don't need to do anything here at this point. The default case of the
// select allows this operation to be non-blocking.
default:
}
// We have already handled the memory pressure error. We don't have to
// bubble up the error any further.
return nil
}
return err
}

0 comments on commit 9104fd1

Please sign in to comment.