Skip to content

Commit

Permalink
sql: limit statistics discard log message
Browse files Browse the repository at this point in the history
Problem:
The discard log message occurs for every transaction end after the limit
is hit. This causes the log to be flooded with discard messages. This
is not useful for users and can cause issues with telemetry pipelines.

Solution:
The discard message will only be logged once per minute. The log rate
is controlled by a cluster setting. This allows the message to be set
to a very large message if this expected behavior for a cluster.

Refactored:
The SQLStats creates and hold the reference to the counts. Then each
container which is per an app name is passed the counts by reference.
It's not obvious that the counts are shared between the containers. The
code was refactored to make a single object to hold the counts and pass
all the related content together. This makes the code easier to read and
expand in the future if other values need to be added.

Fixes: #110454

Release note (sql change): The discard log message is now limited to
once per minute by default. The message was also changed to have both
the number of transactions and the number of statements that were
discarded.
  • Loading branch information
j82w committed Sep 19, 2023
1 parent 0b2ced7 commit 91af030
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 111 deletions.
63 changes: 63 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/flush_test.go
Expand Up @@ -15,6 +15,7 @@ import (
gosql "database/sql"
"fmt"
"math"
"regexp"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -251,6 +252,68 @@ func TestSQLStatsInitialDelay(t *testing.T) {
"expected latest nextFlushAt to be %s, but found %s", maxNextRunAt, initialNextFlushAt)
}

func TestSQLStatsLogDiscardMessage(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
fakeTime := stubTime{
aggInterval: time.Hour,
}
fakeTime.setTime(timeutil.Now())

var params base.TestServerArgs
params.Knobs.SQLStatsKnobs = &sqlstats.TestingKnobs{
StubTimeNow: fakeTime.Now,
}
srv, conn, _ := serverutils.StartServer(t, params)
defer srv.Stopper().Stop(ctx)

sqlConn := sqlutils.MakeSQLRunner(conn)

sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.flush.minimum_interval = '10m'")
sqlConn.Exec(t, fmt.Sprintf("SET CLUSTER SETTING sql.metrics.max_mem_stmt_fingerprints=%d", 8))

for i := 0; i < 20; i++ {
appName := fmt.Sprintf("logDiscardTestApp%d", i)
sqlConn.Exec(t, "SET application_name = $1", appName)
sqlConn.Exec(t, "SELECT 1")
}

log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(
0,
math.MaxInt64,
10000,
regexp.MustCompile(`statistics discarded due to memory limit. transaction discard count:`),
log.WithFlattenedSensitiveData,
)
require.NoError(t, err)
require.Equal(t, 1, len(entries), "there should only be 1 log for the initial execution because the test should take less than 1 minute to execute the 20 commands. cnt: %v", entries)

// lower the time frame to verify log still occurs after the initial one
sqlConn.Exec(t, "SET CLUSTER SETTING sql.metrics.discarded_stats_log.interval='0.00001ms'")

for i := 0; i < 20; i++ {
appName := fmt.Sprintf("logDiscardTestApp2%d", i)
sqlConn.Exec(t, "SET application_name = $1", appName)
sqlConn.Exec(t, "SELECT 1")
}

log.FlushFiles()

entries, err = log.FetchEntriesFromFiles(
0,
math.MaxInt64,
10000,
regexp.MustCompile(`statistics discarded due to memory limit. transaction discard count:`),
log.WithFlattenedSensitiveData,
)
require.NoError(t, err)
require.GreaterOrEqual(t, len(entries), 1, "there should only be 1 log for the initial execution because the test should take less than 1 minute to execute the 20 commands. cnt: %v", entries)
}

func TestSQLStatsMinimumFlushInterval(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
43 changes: 13 additions & 30 deletions pkg/sql/sqlstats/sslocal/sql_stats.go
Expand Up @@ -13,7 +13,6 @@ package sslocal
import (
"context"
"math"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
Expand All @@ -31,14 +30,6 @@ import (
type SQLStats struct {
st *cluster.Settings

// uniqueStmtFingerprintLimit is the limit on number of unique statement
// fingerprints we can store in memory.
uniqueStmtFingerprintLimit *settings.IntSetting

// uniqueTxnFingerprintLimit is the limit on number of unique transaction
// fingerprints we can store in memory.
uniqueTxnFingerprintLimit *settings.IntSetting

mu struct {
syncutil.Mutex

Expand All @@ -50,15 +41,8 @@ type SQLStats struct {
apps map[string]*ssmemstorage.Container
}

atomic struct {
// uniqueStmtFingerprintCount is the number of unique statement fingerprints
// we are storing in memory.
uniqueStmtFingerprintCount int64

// uniqueTxnFingerprintCount is the number of unique transaction fingerprints
// we are storing in memory.
uniqueTxnFingerprintCount int64
}
// Server level counter
atomic *ssmemstorage.SQLStatsAtomicCounters

// flushTarget is a Sink that, when the SQLStats resets at the end of its
// reset interval, the SQLStats will dump all of the stats into if it is not
Expand Down Expand Up @@ -93,14 +77,16 @@ func newSQLStats(
st,
)
s := &SQLStats{
st: st,
uniqueStmtFingerprintLimit: uniqueStmtFingerprintLimit,
uniqueTxnFingerprintLimit: uniqueTxnFingerprintLimit,
flushTarget: flushTarget,
knobs: knobs,
insights: insightsWriter,
latencyInformation: latencyInformation,
st: st,
flushTarget: flushTarget,
knobs: knobs,
insights: insightsWriter,
latencyInformation: latencyInformation,
}
s.atomic = ssmemstorage.NewSQLStatsAtomicCounters(
st,
uniqueStmtFingerprintLimit,
uniqueTxnFingerprintLimit)
s.mu.apps = make(map[string]*ssmemstorage.Container)
s.mu.mon = monitor
s.mu.mon.StartNoReserved(context.Background(), parentMon)
Expand All @@ -110,7 +96,7 @@ func newSQLStats(
// GetTotalFingerprintCount returns total number of unique statement and
// transaction fingerprints stored in the current SQLStats.
func (s *SQLStats) GetTotalFingerprintCount() int64 {
return atomic.LoadInt64(&s.atomic.uniqueStmtFingerprintCount) + atomic.LoadInt64(&s.atomic.uniqueTxnFingerprintCount)
return s.atomic.GetTotalFingerprintCount()
}

// GetTotalFingerprintBytes returns the total amount of bytes currently
Expand All @@ -130,10 +116,7 @@ func (s *SQLStats) getStatsForApplication(appName string) *ssmemstorage.Containe
}
a := ssmemstorage.New(
s.st,
s.uniqueStmtFingerprintLimit,
s.uniqueTxnFingerprintLimit,
&s.atomic.uniqueStmtFingerprintCount,
&s.atomic.uniqueTxnFingerprintCount,
s.atomic,
s.mu.mon,
appName,
s.knobs,
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/sqlstats/sslocal/sslocal_provider.go
Expand Up @@ -107,10 +107,7 @@ func (s *SQLStats) GetApplicationStats(appName string, internal bool) sqlstats.A
}
a := ssmemstorage.New(
s.st,
s.uniqueStmtFingerprintLimit,
s.uniqueTxnFingerprintLimit,
&s.atomic.uniqueStmtFingerprintCount,
&s.atomic.uniqueTxnFingerprintCount,
s.atomic,
s.mu.mon,
appName,
s.knobs,
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// StatsCollector is used to collect statement and transaction statistics
Expand Down Expand Up @@ -106,8 +105,9 @@ func (s *StatsCollector) EndTransaction(
s.ApplicationStats,
)

// Avoid taking locks if no stats are discarded.
if discardedStats > 0 {
log.Warningf(ctx, "%d statement statistics discarded due to memory limit", discardedStats)
s.flushTarget.MaybeLogDiscardMessage(ctx)
}

s.ApplicationStats.Free(ctx)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/ssmemstorage/BUILD.bazel
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "ssmemstorage",
srcs = [
"ss_mem_counter.go",
"ss_mem_iterator.go",
"ss_mem_storage.go",
"ss_mem_writer.go",
Expand Down
177 changes: 177 additions & 0 deletions pkg/sql/sqlstats/ssmemstorage/ss_mem_counter.go
@@ -0,0 +1,177 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

// Package sqlstats is a subsystem that is responsible for tracking the
// statistics of statements and transactions.

package ssmemstorage

import (
"context"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

type SQLStatsAtomicCounters struct {
st *cluster.Settings

// uniqueStmtFingerprintLimit is the limit on number of unique statement
// fingerprints we can store in memory.
UniqueStmtFingerprintLimit *settings.IntSetting

// uniqueTxnFingerprintLimit is the limit on number of unique transaction
// fingerprints we can store in memory.
UniqueTxnFingerprintLimit *settings.IntSetting

// uniqueStmtFingerprintCount is the number of unique statement fingerprints
// we are storing in memory.
uniqueStmtFingerprintCount int64

// uniqueTxnFingerprintCount is the number of unique transaction fingerprints
// we are storing in memory.
uniqueTxnFingerprintCount int64

// discardUniqueStmtFingerprintCount is the number of unique statement
// fingerprints that are discard because of memory limitations.
discardUniqueStmtFingerprintCount int64

// discardUniqueTxnFingerprintCount is the number of unique transaction
// fingerprints that are discard because of memory limitations.
discardUniqueTxnFingerprintCount int64

mu struct {
syncutil.Mutex

// lastDiscardLogMessageSent is the last time a log message was sent for
// statistics being discarded because of memory pressure.
lastDiscardLogMessageSent time.Time
}
}

// DiscardedStatsLogInterval specifies the interval between log emissions for discarded
// statement and transaction statistics due to reaching the SQL statistics memory limit.
var DiscardedStatsLogInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
"sql.metrics.discarded_stats_log.interval",
"interval between log emissions for discarded statistics due to SQL statistics memory limit",
1*time.Minute,
settings.NonNegativeDuration,
settings.WithVisibility(settings.Reserved))

func NewSQLStatsAtomicCounters(
st *cluster.Settings,
uniqueStmtFingerprintLimit *settings.IntSetting,
uniqueTxnFingerprintLimit *settings.IntSetting,
) *SQLStatsAtomicCounters {
return &SQLStatsAtomicCounters{
st: st,
UniqueStmtFingerprintLimit: uniqueStmtFingerprintLimit,
UniqueTxnFingerprintLimit: uniqueTxnFingerprintLimit,
}
}

// maybeLogDiscardMessage logs a warning if statement or transaction
// fingerprints were discarded because of memory limits and enough time passed
// since the last time the warning was logged. This is necessary to avoid
// flooding the log with warnings once the limit is hit.
func (s *SQLStatsAtomicCounters) maybeLogDiscardMessage(ctx context.Context) {
discardSmtCnt := atomic.LoadInt64(&s.discardUniqueStmtFingerprintCount)
discardTxnCnt := atomic.LoadInt64(&s.discardUniqueTxnFingerprintCount)
if discardSmtCnt == 0 && discardTxnCnt == 0 {
return
}

// Get the config values before the lock to reduce time in the lock.
discardLogInterval := DiscardedStatsLogInterval.Get(&s.st.SV)
stmtLimit := s.UniqueStmtFingerprintLimit.Get(&s.st.SV)
txnLimit := s.UniqueTxnFingerprintLimit.Get(&s.st.SV)
s.mu.Lock()
defer s.mu.Unlock()
timeNow := timeutil.Now()

// Not enough time has passed since the last log message was sent.
if timeNow.Sub(s.mu.lastDiscardLogMessageSent) < discardLogInterval {
return
}

// The discard counts might be slightly off because it's possible that the
// count changed after the initial load and before the log message is sent.
// The count being slightly off won't impact users looking at the message. It
// also avoids holding a lock on the counts which would block requests until
// the log is sent.
log.Warningf(ctx, "statistics discarded due to memory limit. transaction discard count: %d with limit: %d, statement discard count: %d with limit: %d, logged at interval: %s, last logged: %s",
discardTxnCnt, stmtLimit, discardSmtCnt, txnLimit, discardLogInterval, s.mu.lastDiscardLogMessageSent)
s.mu.lastDiscardLogMessageSent = timeNow

// Reset the discard count back to 0 since the value was logged
atomic.StoreInt64(&s.discardUniqueStmtFingerprintCount, int64(0))
atomic.StoreInt64(&s.discardUniqueTxnFingerprintCount, int64(0))
}

// tryAddStmtFingerprint attempts to add 1 to the server level count for
// statement level fingerprints and returns false if it is being throttled.
func (s *SQLStatsAtomicCounters) tryAddStmtFingerprint() (ok bool) {
limit := s.UniqueStmtFingerprintLimit.Get(&s.st.SV)

// We check if we have reached the limit of unique fingerprints we can
// store.
incrementedFingerprintCount :=
atomic.AddInt64(&s.uniqueStmtFingerprintCount, int64(1) /* delts */)

// Abort if we have exceeded limit of unique statement fingerprints.
if incrementedFingerprintCount < limit {
return true
}

atomic.AddInt64(&s.discardUniqueStmtFingerprintCount, int64(1))
atomic.AddInt64(&s.uniqueStmtFingerprintCount, -int64(1) /* delts */)
return false
}

// tryAddTxnFingerprint attempts to add 1 to the server level count for
// transaction level fingerprints and returns false if it is being throttled.
func (s *SQLStatsAtomicCounters) tryAddTxnFingerprint() (ok bool) {
limit := s.UniqueTxnFingerprintLimit.Get(&s.st.SV)

// We check if we have reached the limit of unique fingerprints we can
// store.
incrementedFingerprintCount :=
atomic.AddInt64(&s.uniqueTxnFingerprintCount, int64(1) /* delts */)

if incrementedFingerprintCount < limit {
return true
}

atomic.AddInt64(&s.discardUniqueTxnFingerprintCount, int64(1))
atomic.AddInt64(&s.uniqueTxnFingerprintCount, -int64(1) /* delts */)
return false
}

// freeByCnt decrements the statement and transaction count by the value
// passed in. This is used in scenarios where an entire container which is
// per an app name is being cleaned up.
func (s *SQLStatsAtomicCounters) freeByCnt(
uniqueStmtFingerprintCount, uniqueTxnFingerprintCount int64,
) {
atomic.AddInt64(&s.uniqueStmtFingerprintCount, uniqueStmtFingerprintCount)
atomic.AddInt64(&s.uniqueTxnFingerprintCount, uniqueTxnFingerprintCount)
}

// GetTotalFingerprintCount returns total number of unique statement and
// transaction fingerprints stored in the current SQLStats.
func (s *SQLStatsAtomicCounters) GetTotalFingerprintCount() int64 {
return atomic.LoadInt64(&s.uniqueStmtFingerprintCount) + atomic.LoadInt64(&s.uniqueTxnFingerprintCount)
}

0 comments on commit 91af030

Please sign in to comment.