Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sqlstats: fix insights memleak when disabling sql.metrics.transaction_details.enabled #117709

Merged
merged 4 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ ALL_TESTS = [
"//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil:sqlstatsutil_test",
"//pkg/sql/sqlstats/persistedsqlstats:persistedsqlstats_test",
"//pkg/sql/sqlstats/sslocal:sslocal_test",
"//pkg/sql/sqlstats/ssmemstorage:ssmemstorage_test",
"//pkg/sql/sqltestutils:sqltestutils_test",
"//pkg/sql/stats:stats_test",
"//pkg/sql/stmtdiagnostics:stmtdiagnostics_test",
Expand Down Expand Up @@ -2165,6 +2166,7 @@ GO_TARGETS = [
"//pkg/sql/sqlstats/sslocal:sslocal",
"//pkg/sql/sqlstats/sslocal:sslocal_test",
"//pkg/sql/sqlstats/ssmemstorage:ssmemstorage",
"//pkg/sql/sqlstats/ssmemstorage:ssmemstorage_test",
"//pkg/sql/sqlstats:sqlstats",
"//pkg/sql/sqltelemetry:sqltelemetry",
"//pkg/sql/sqltestutils:sqltestutils",
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,12 @@ func NewServer(
metrics := makeMetrics(false /* internal */)
serverMetrics := makeServerMetrics(cfg)
insightsProvider := insights.New(cfg.Settings, serverMetrics.InsightsMetrics, eventsExporter)
// TODO(117690): Unify StmtStatsEnable and TxnStatsEnable into a single cluster setting.
sqlstats.TxnStatsEnable.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
if !sqlstats.TxnStatsEnable.Get(&cfg.Settings.SV) {
insightsProvider.Writer(false /*internal*/).Clear(ctx)
}
})
reportedSQLStats := sslocal.New(
cfg.Settings,
sqlstats.MaxMemReportedSQLStatsStmtFingerprints,
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/contention/contentionutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ go_test(
srcs = ["concurrent_buffer_guard_test.go"],
embed = [":contentionutils"],
deps = [
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/syncutil",
"//pkg/util/uuid",
"@com_github_stretchr_testify//require",
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/contention/contentionutils/concurrent_buffer_guard.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ func (c *ConcurrentBufferGuard) ForceSync() {
c.syncLocked()
}

// ForceSyncExec blocks all inflight and upcoming write operation, to allow
// the onBufferFullHandler to be executed. However, unlike ForceSync, ForceSyncExec
// executes the provided function prior to executing onBufferFullHandler, which allows
// callers to atomically apply state to this specific execution of the
// onBufferFullHandler.
func (c *ConcurrentBufferGuard) ForceSyncExec(fn func()) {
c.flushSyncLock.Lock()
defer c.flushSyncLock.Unlock()
fn()
c.syncLocked()
}

func (c *ConcurrentBufferGuard) syncRLocked() {
// We upgrade the read-lock to a write-lock, then when we are done flushing,
// the lock is downgraded to a read-lock.
Expand Down
29 changes: 29 additions & 0 deletions pkg/sql/contention/contentionutils/concurrent_buffer_guard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -117,6 +119,33 @@ func TestConcurrentWriterGuard(t *testing.T) {
}
}

func TestConcurrentBufferGuard_ForceSyncExec(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

t.Run("executes function prior to onBufferFullSync", func(t *testing.T) {
// Construct a guard whose onBufferFullSync fn simply checks that a
// condition was set prior to execution. We can later make assertions
// on the result.
conditionSet := false
fnCalled := false
guard := NewConcurrentBufferGuard(
func() int64 {
return 1024 // 1K
}, /* limiter */
func(currentWriterIdx int64) {
fnCalled = conditionSet
}, /* onBufferFullSync */
)
// Execute and verify our func was called prior to the
// onBufferFullSync call.
guard.ForceSyncExec(func() {
conditionSet = true
})
require.True(t, fnCalled)
})
}

func runConcurrentWriterGuard(t *testing.T, concurrentWriters int, sizeLimit int64) {
start := make(chan struct{})
buf := newTestBuffer(sizeLimit)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ func (ih *instrumentationHelper) Setup(

// Don't collect it if Stats Collection is disabled. If it is disabled the
// stats are not stored, so it always returns false for previouslySampled.
// TODO(117690): Unify StmtStatsEnable and TxnStatsEnable into a single cluster setting.
if !collectTxnExecStats && (!previouslySampled && sqlstats.StmtStatsEnable.Get(&cfg.Settings.SV)) {
// We don't collect the execution stats for statements in this txn, but
// this is the first time we see this statement ever, so we'll collect
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sessionphase",
"//pkg/sql/sqlstats/insights",
"//pkg/util/stop",
"//pkg/util/uuid",
],
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sqlstats/cluster_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
)

// StmtStatsEnable determines whether to collect per-statement statistics.
// TODO(117690): Unify StmtStatsEnable and TxnStatsEnable into a single cluster setting.
var StmtStatsEnable = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"sql.metrics.statement_details.enabled", "collect per-statement query statistics", true,
Expand All @@ -35,6 +36,7 @@ var TxnStatsNumStmtFingerprintIDsToRecord = settings.RegisterIntSetting(

// TxnStatsEnable determines whether to collect per-application transaction
// statistics.
// TODO(117690): Unify StmtStatsEnable and TxnStatsEnable into a single cluster setting.
var TxnStatsEnable = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"sql.metrics.transaction_details.enabled", "collect per-application transaction statistics", true,
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sqlstats/insights/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,14 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql/appstatspb",
"//pkg/sql/clusterunique",
"//pkg/testutils",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/uint128",
"//pkg/util/uuid",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_kr_pretty//:pretty",
"@com_github_stretchr_testify//require",
],
Expand Down
92 changes: 71 additions & 21 deletions pkg/sql/sqlstats/insights/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,23 @@ type concurrentBufferIngester struct {
eventBuffer *eventBuffer
}

eventBufferCh chan *eventBuffer
opts struct {
// noTimedFlush prevents time-triggered flushes from being scheduled.
noTimedFlush bool
}

eventBufferCh chan eventBufChPayload
registry *lockingRegistry
running uint64
clearRegistry uint32
}

var _ Writer = &concurrentBufferIngester{}
type eventBufChPayload struct {
clearRegistry bool
events *eventBuffer
}

var _ Writer = (*concurrentBufferIngester)(nil)

// concurrentBufferIngester buffers the "events" it sees (via ObserveStatement
// and ObserveTransaction) and passes them along to the underlying registry
Expand All @@ -60,38 +71,68 @@ type event struct {
statement *Statement
}

func (i *concurrentBufferIngester) Start(ctx context.Context, stopper *stop.Stopper) {
type BufferOpt func(i *concurrentBufferIngester)

// WithoutTimedFlush prevents the concurrentBufferIngester from performing
// timed flushes to the underlying registry. Generally only useful for
// testing purposes.
func WithoutTimedFlush() BufferOpt {
return func(i *concurrentBufferIngester) {
i.opts.noTimedFlush = true
}
}

func (i *concurrentBufferIngester) Start(
ctx context.Context, stopper *stop.Stopper, opts ...BufferOpt,
) {
for _, opt := range opts {
opt(i)
}
// This task pulls buffers from the channel and forwards them along to the
// underlying registry.
_ = stopper.RunAsyncTask(ctx, "insights-ingester", func(ctx context.Context) {
atomic.StoreUint64(&i.running, 1)

for {
select {
case events := <-i.eventBufferCh:
i.ingest(ctx, events) // note that injest clears the buffer
eventBufferPool.Put(events)
case payload := <-i.eventBufferCh:
i.ingest(ctx, payload.events) // note that ingest clears the buffer
if payload.clearRegistry {
i.registry.Clear(ctx)
}
eventBufferPool.Put(payload.events)
case <-stopper.ShouldQuiesce():
atomic.StoreUint64(&i.running, 0)
return
}
}
})

// This task eagerly flushes partial buffers into the channel, to avoid
// delays identifying insights in low-traffic clusters and tests.
_ = stopper.RunAsyncTask(ctx, "insights-ingester-flush", func(ctx context.Context) {
ticker := time.NewTicker(500 * time.Millisecond)

for {
select {
case <-ticker.C:
i.guard.ForceSync()
case <-stopper.ShouldQuiesce():
ticker.Stop()
return
if !i.opts.noTimedFlush {
// This task eagerly flushes partial buffers into the channel, to avoid
// delays identifying insights in low-traffic clusters and tests.
_ = stopper.RunAsyncTask(ctx, "insights-ingester-flush", func(ctx context.Context) {
ticker := time.NewTicker(500 * time.Millisecond)

for {
select {
case <-ticker.C:
i.guard.ForceSync()
case <-stopper.ShouldQuiesce():
ticker.Stop()
return
}
}
}
})
}
}

// Clear flushes the underlying buffer, and signals the underlying registry
// to clear any remaining cached data afterward. This is an async operation.
func (i *concurrentBufferIngester) Clear(_ context.Context) {
i.guard.ForceSyncExec(func() {
// Our flush function defined on the guard is responsible for setting clearRegistry back to 0.
atomic.StoreUint32(&i.clearRegistry, 1)
})
}

Expand Down Expand Up @@ -147,7 +188,7 @@ func newConcurrentBufferIngester(registry *lockingRegistry) *concurrentBufferIng
// in the micro-benchmarks, but further increases had no effect.
// Otherwise, we rely solely on the size of the eventBuffer for
// adjusting our carrying capacity.
eventBufferCh: make(chan *eventBuffer, 1),
eventBufferCh: make(chan eventBufChPayload, 1),
registry: registry,
}

Expand All @@ -157,8 +198,17 @@ func newConcurrentBufferIngester(registry *lockingRegistry) *concurrentBufferIng
return bufferSize
},
func(currentWriterIndex int64) {
clearRegistry := atomic.LoadUint32(&i.clearRegistry) == 1
if clearRegistry {
defer func() {
atomic.StoreUint32(&i.clearRegistry, 0)
}()
}
if atomic.LoadUint64(&i.running) == 1 {
i.eventBufferCh <- i.guard.eventBuffer
i.eventBufferCh <- eventBufChPayload{
clearRegistry: clearRegistry,
events: i.guard.eventBuffer,
}
}
i.guard.eventBuffer = eventBufferPool.Get().(*eventBuffer)
},
Expand Down
62 changes: 62 additions & 0 deletions pkg/sql/sqlstats/insights/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@ package insights

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/obs"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -118,6 +123,63 @@ func TestIngester(t *testing.T) {
}
}

func TestIngester_Clear(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
settings := cluster.MakeTestingClusterSettings()

t.Run("clears buffer", func(t *testing.T) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
store := newStore(settings, obs.NoopEventsExporter{})
ingester := newConcurrentBufferIngester(
newRegistry(settings, &fakeDetector{
stubEnabled: true,
stubIsSlow: true,
}, store))
// Start the ingester and wait for its async task to start.
// Disable timed flushes, so we can guarantee the presence of data
// in the buffer later on.
ingester.Start(ctx, stopper, WithoutTimedFlush())
testutils.SucceedsSoon(t, func() error {
if !(atomic.LoadUint64(&ingester.running) == 1) {
return errors.New("ingester not yet started")
}
return nil
})
// Fill the ingester's buffer with some data. This sets us up to
// call Clear() with guaranteed data in the buffer, so we can assert
// afterward that it's been cleared.
ingesterObservations := []testEvent{
{sessionID: 1, statementID: 10},
{sessionID: 2, statementID: 20},
{sessionID: 1, statementID: 11},
{sessionID: 2, statementID: 21},
{sessionID: 1, transactionID: 100},
{sessionID: 2, transactionID: 200},
}
for _, o := range ingesterObservations {
if o.statementID != 0 {
ingester.ObserveStatement(o.SessionID(), &Statement{ID: o.StatementID()})
} else {
ingester.ObserveTransaction(ctx, o.SessionID(), &Transaction{ID: o.TransactionID()})
}
}
empty := event{}
require.NotEqual(t, empty, ingester.guard.eventBuffer[0])
// Now, call Clear() to verify it clears the buffer.
// Use SucceedsSoon for assertions, since the operation is async.
ingester.Clear(ctx)
testutils.SucceedsSoon(t, func() error {
if ingester.guard.eventBuffer[0] != empty {
return errors.New("eventBuffer not empty")
}
return nil
})
})
}

func TestIngester_Disabled(t *testing.T) {
// It's important that we be able to disable all of the insights machinery
// should something go wrong. Here we peek at the internals of the ingester
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sqlstats/insights/insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ type Writer interface {

// ObserveTransaction notifies the registry of the end of a transaction.
ObserveTransaction(ctx context.Context, sessionID clusterunique.ID, transaction *Transaction)

// Clear clears the underlying cache of its contents, with no guarantees around flush behavior.
// Data may simply be erased depending on the implementation.
Clear(ctx context.Context)
}

// WriterProvider offers a Writer.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sqlstats/insights/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,7 @@ func (n *nullWriter) ObserveStatement(_ clusterunique.ID, _ *Statement) {
func (n *nullWriter) ObserveTransaction(_ context.Context, _ clusterunique.ID, _ *Transaction) {
}

func (n *nullWriter) Clear(_ context.Context) {
}

var nullWriterInstance Writer = &nullWriter{}
Loading
Loading