Skip to content

Commit

Permalink
insights: flush cache when transaction statistics disabled
Browse files Browse the repository at this point in the history
A memory leak was found where disabling the cluster setting
`sql.metrics.transaction_details.enabled` prevented the insights
cache from flushing statements. A previous patch fixed this
partially by preventing further statement & transactions from being
recorded to the insights system when the cluster setting was disabled.

This patch implements the other piece of the memory leak fix, which is
flushing the insights caches when
`sql.metrics.transaction_details.enabled` is disabled. It adds an
onChange listener to the setting, which triggers a new `CacheClearer`
interface implementation that handles the cache clearing.

Fixes: #117300

Release note: none
  • Loading branch information
abarganier committed Jan 12, 2024
1 parent 65f298f commit 298e9b1
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 22 deletions.
6 changes: 6 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,12 @@ func NewServer(
metrics := makeMetrics(false /* internal */)
serverMetrics := makeServerMetrics(cfg)
insightsProvider := insights.New(cfg.Settings, serverMetrics.InsightsMetrics, eventsExporter)
// TODO(117690): Unify StmtStatsEnable and TxnStatsEnable into a single cluster setting.
sqlstats.TxnStatsEnable.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
if !sqlstats.TxnStatsEnable.Get(&cfg.Settings.SV) {
insightsProvider.CacheClearer().Clear(ctx)
}
})
reportedSQLStats := sslocal.New(
cfg.Settings,
sqlstats.MaxMemReportedSQLStatsStmtFingerprints,
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sqlstats/insights/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,14 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql/appstatspb",
"//pkg/sql/clusterunique",
"//pkg/testutils",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/uint128",
"//pkg/util/uuid",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_kr_pretty//:pretty",
"@com_github_stretchr_testify//require",
],
Expand Down
51 changes: 43 additions & 8 deletions pkg/sql/sqlstats/insights/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,24 @@ type concurrentBufferIngester struct {
eventBuffer *eventBuffer
}

eventBufferCh chan *eventBuffer
testKnobs struct {
// noTimedFlush prevents time-triggered flushes from being scheduled.
noTimedFlush bool
}

eventBufferCh chan eventBufChPayload
registry *lockingRegistry
running uint64
clearRegistry uint32
}

type eventBufChPayload struct {
clearRegistry bool
events *eventBuffer
}

var _ Writer = &concurrentBufferIngester{}
var _ Writer = (*concurrentBufferIngester)(nil)
var _ CacheClearer = (*concurrentBufferIngester)(nil)

// concurrentBufferIngester buffers the "events" it sees (via ObserveStatement
// and ObserveTransaction) and passes them along to the underlying registry
Expand Down Expand Up @@ -68,9 +80,12 @@ func (i *concurrentBufferIngester) Start(ctx context.Context, stopper *stop.Stop

for {
select {
case events := <-i.eventBufferCh:
i.ingest(ctx, events) // note that injest clears the buffer
eventBufferPool.Put(events)
case payload := <-i.eventBufferCh:
i.ingest(ctx, payload.events) // note that ingest clears the buffer
if payload.clearRegistry {
i.registry.Clear(ctx)
}
eventBufferPool.Put(payload.events)
case <-stopper.ShouldQuiesce():
atomic.StoreUint64(&i.running, 0)
return
Expand All @@ -86,7 +101,9 @@ func (i *concurrentBufferIngester) Start(ctx context.Context, stopper *stop.Stop
for {
select {
case <-ticker.C:
i.guard.ForceSync()
if !i.testKnobs.noTimedFlush {
i.guard.ForceSync()
}
case <-stopper.ShouldQuiesce():
ticker.Stop()
return
Expand All @@ -95,6 +112,15 @@ func (i *concurrentBufferIngester) Start(ctx context.Context, stopper *stop.Stop
})
}

// Clear flushes the underlying buffer, and signals the underlying registry
// to clear any remaining cached data afterward. This is an async operation.
func (i *concurrentBufferIngester) Clear(_ context.Context) {
i.guard.ForceSyncExec(func() {
// Our flush function defined on the guard is responsible for setting clearRegistry back to 0.
atomic.StoreUint32(&i.clearRegistry, 1)
})
}

func (i *concurrentBufferIngester) ingest(ctx context.Context, events *eventBuffer) {
for idx, e := range events {
// Because an eventBuffer is a fixed-size array, rather than a slice,
Expand Down Expand Up @@ -147,7 +173,7 @@ func newConcurrentBufferIngester(registry *lockingRegistry) *concurrentBufferIng
// in the micro-benchmarks, but further increases had no effect.
// Otherwise, we rely solely on the size of the eventBuffer for
// adjusting our carrying capacity.
eventBufferCh: make(chan *eventBuffer, 1),
eventBufferCh: make(chan eventBufChPayload, 1),
registry: registry,
}

Expand All @@ -157,8 +183,17 @@ func newConcurrentBufferIngester(registry *lockingRegistry) *concurrentBufferIng
return bufferSize
},
func(currentWriterIndex int64) {
clearRegistry := atomic.LoadUint32(&i.clearRegistry) == 1
if clearRegistry {
defer func() {
atomic.StoreUint32(&i.clearRegistry, 0)
}()
}
if atomic.LoadUint64(&i.running) == 1 {
i.eventBufferCh <- i.guard.eventBuffer
i.eventBufferCh <- eventBufChPayload{
clearRegistry: clearRegistry,
events: i.guard.eventBuffer,
}
}
i.guard.eventBuffer = eventBufferPool.Get().(*eventBuffer)
},
Expand Down
85 changes: 85 additions & 0 deletions pkg/sql/sqlstats/insights/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@ package insights

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/obs"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -118,6 +123,86 @@ func TestIngester(t *testing.T) {
}
}

func TestIngester_Clear(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
settings := cluster.MakeTestingClusterSettings()

t.Run("clears buffer", func(t *testing.T) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
store := newStore(settings, obs.NoopEventsExporter{})
ingester := newConcurrentBufferIngester(
newRegistry(settings, &fakeDetector{
stubEnabled: true,
stubIsSlow: true,
}, store))
// Start the ingester and wait for its async tasks to start.
ingester.Start(ctx, stopper)
testutils.SucceedsSoon(t, func() error {
if !(atomic.LoadUint64(&ingester.running) == 1) {
return errors.New("ingester not yet started")
}
return nil
})
// Disable timed flushes and fill the ingester's buffer and underlying registry
// with some data. This sets us up to call Clear() with guaranteed data
// in both, so we can assert afterward that both have been cleared.
ingester.testKnobs.noTimedFlush = true
observe := func(sink Writer, observations []testEvent) {
for _, o := range observations {
if o.statementID != 0 {
sink.ObserveStatement(o.SessionID(), &Statement{ID: o.StatementID()})
} else {
sink.ObserveTransaction(ctx, o.SessionID(), &Transaction{ID: o.TransactionID()})
}
}
}
ingesterObservations := []testEvent{
{sessionID: 1, statementID: 10},
{sessionID: 2, statementID: 20},
{sessionID: 1, statementID: 11},
{sessionID: 2, statementID: 21},
{sessionID: 1, transactionID: 100},
{sessionID: 2, transactionID: 200},
}
// Statements are cleared from the registry when a txn with the same
// sessionID is observed. Make sure the data we add to the registry
// can't be cleared by any of the ingester's flushed txns. This way,
// the only way the data can be cleared is via ingester.Clear().
registryObservations := []testEvent{
{sessionID: 3, statementID: 31}, // No associated txn.
{sessionID: 4, statementID: 41}, // No associated txn.
}
observe(ingester, ingesterObservations)
observe(ingester.registry, registryObservations)
empty := event{}
require.NotEqual(t, empty, ingester.guard.eventBuffer[0])
func() {
ingester.registry.mu.Lock()
defer ingester.registry.mu.Unlock()
require.NotEmpty(t, ingester.registry.mu.statements)
}()
// Now, call Clear() to verify it clears the buffer and the
// underlying registry. Use SucceedsSoon for assertions, since
// the operation is async.
ingester.Clear(ctx)
testutils.SucceedsSoon(t, func() error {
if ingester.guard.eventBuffer[0] != empty {
return errors.New("eventBuffer not empty")
}

ingester.registry.mu.Lock()
defer ingester.registry.mu.Unlock()
if len(ingester.registry.mu.statements) > 0 {
return errors.Newf("registry not empty, size: %v", len(ingester.registry.mu.statements))
}
return nil
})
})
}

func TestIngester_Disabled(t *testing.T) {
// It's important that we be able to disable all of the insights machinery
// should something go wrong. Here we peek at the internals of the ingester
Expand Down
13 changes: 13 additions & 0 deletions pkg/sql/sqlstats/insights/insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ type Writer interface {
ObserveTransaction(ctx context.Context, sessionID clusterunique.ID, transaction *Transaction)
}

// CacheClearer provides methods to erase data from a cache. It provides no guarantees
// when it comes to flushing the contents of the cache - they may be simply erased if
// the underlying implementation chooses to do so.
type CacheClearer interface {
// Clear clears the cache of its contents, with no guarantees around flush behavior.
// Data may simply be erased depending on the implementation.
Clear(ctx context.Context)
}

// WriterProvider offers a Writer.
// Pass true for internal when called by the internal executor.
type WriterProvider func(internal bool) Writer
Expand Down Expand Up @@ -191,6 +200,10 @@ type Provider interface {
// LatencyInformation returns an object that offers read access to latency information,
// such as percentiles.
LatencyInformation() LatencyInformation

// CacheClearer clears all cached insights data that has yet to be written to the data store
// provided by Reader().
CacheClearer() CacheClearer
}

// New builds a new Provider.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sqlstats/insights/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (p *defaultProvider) LatencyInformation() LatencyInformation {
return p.anomalyDetector
}

func (p *defaultProvider) CacheClearer() CacheClearer {
return p.ingester
}

type nullWriter struct{}

func (n *nullWriter) ObserveStatement(_ clusterunique.ID, _ *Statement) {
Expand Down
50 changes: 36 additions & 14 deletions pkg/sql/sqlstats/insights/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,42 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/util/intsets"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/redact"
)

// This registry is the central object in the insights subsystem. It observes
// statement execution to determine which statements are outliers and
// writes insights into the provided sink.
type lockingRegistry struct {
statements map[clusterunique.ID]*statementBuf
detector detector
causes *causes
sink sink
mu struct {
syncutil.Mutex
statements map[clusterunique.ID]*statementBuf
}
detector detector
causes *causes
sink sink
}

var _ Writer = &lockingRegistry{}
var _ Writer = (*lockingRegistry)(nil)
var _ CacheClearer = (*lockingRegistry)(nil)

func (r *lockingRegistry) Clear(_ context.Context) {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.statements = make(map[clusterunique.ID]*statementBuf)
}

func (r *lockingRegistry) ObserveStatement(sessionID clusterunique.ID, statement *Statement) {
if !r.enabled() {
return
}
b, ok := r.statements[sessionID]
r.mu.Lock()
defer r.mu.Unlock()
b, ok := r.mu.statements[sessionID]
if !ok {
b = statementsBufPool.Get().(*statementBuf)
r.statements[sessionID] = b
r.mu.statements[sessionID] = b
}
b.append(statement)
}
Expand Down Expand Up @@ -99,11 +112,19 @@ func (r *lockingRegistry) ObserveTransaction(
if transaction.ID.String() == "00000000-0000-0000-0000-000000000000" {
return
}
statements, ok := r.statements[sessionID]
statements, ok := func() (*statementBuf, bool) {
r.mu.Lock()
defer r.mu.Unlock()
statements, ok := r.mu.statements[sessionID]
if !ok {
return nil, false
}
delete(r.mu.statements, sessionID)
return statements, true
}()
if !ok {
return
}
delete(r.statements, sessionID)
defer statements.release()

// Mark statements which are detected as slow or have a failed status.
Expand Down Expand Up @@ -200,12 +221,13 @@ func (r *lockingRegistry) enabled() bool {
}

func newRegistry(st *cluster.Settings, detector detector, sink sink) *lockingRegistry {
return &lockingRegistry{
statements: make(map[clusterunique.ID]*statementBuf),
detector: detector,
causes: &causes{st: st},
sink: sink,
lr := &lockingRegistry{
detector: detector,
causes: &causes{st: st},
sink: sink,
}
lr.mu.statements = make(map[clusterunique.ID]*statementBuf)
return lr
}

func (s *Statement) CopyTo(
Expand Down

0 comments on commit 298e9b1

Please sign in to comment.